diff --git a/swh/provenance/__init__.py b/swh/provenance/__init__.py index 6bcba78..da52aa0 100644 --- a/swh/provenance/__init__.py +++ b/swh/provenance/__init__.py @@ -1,40 +1,32 @@ from typing import TYPE_CHECKING from .postgresql.db_utils import connect if TYPE_CHECKING: from swh.provenance.archive import ArchiveInterface from swh.provenance.provenance import ProvenanceInterface def get_archive(cls: str, **kwargs) -> "ArchiveInterface": if cls == "api": from swh.provenance.storage.archive import ArchiveStorage from swh.storage import get_storage return ArchiveStorage(get_storage(**kwargs["storage"])) elif cls == "direct": from swh.provenance.postgresql.archive import ArchivePostgreSQL return ArchivePostgreSQL(connect(kwargs["db"])) else: raise NotImplementedError def get_provenance(cls: str, **kwargs) -> "ProvenanceInterface": if cls == "local": conn = connect(kwargs["db"]) - if kwargs.get("with_path", True): - from swh.provenance.postgresql.provenancedb_with_path import ( - ProvenanceWithPathDB, - ) - - return ProvenanceWithPathDB(conn) - else: - from swh.provenance.postgresql.provenancedb_without_path import ( - ProvenanceWithoutPathDB, - ) - - return ProvenanceWithoutPathDB(conn) + with_path = kwargs.get("with_path", True) + from swh.provenance.provenance import ProvenanceBackend + + return ProvenanceBackend(conn, with_path=with_path) else: raise NotImplementedError diff --git a/swh/provenance/postgresql/provenancedb_base.py b/swh/provenance/postgresql/provenancedb_base.py index 6a62540..33319c8 100644 --- a/swh/provenance/postgresql/provenancedb_base.py +++ b/swh/provenance/postgresql/provenancedb_base.py @@ -1,233 +1,167 @@ from datetime import datetime import itertools import logging -from typing import Any, Dict, Iterable, List, Optional +from typing import Any, Dict, Generator, List, Optional, Set, Tuple import psycopg2 import psycopg2.extras -from ..model import DirectoryEntry, FileEntry -from ..origin import OriginEntry -from ..revision import RevisionEntry - class ProvenanceDBBase: - raise_on_commit: bool = False - def __init__(self, conn: psycopg2.extensions.connection): conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) conn.set_session(autocommit=True) self.conn = conn self.cursor = self.conn.cursor() # XXX: not sure this is the best place to do it! self.cursor.execute("SET timezone TO 'UTC'") - self.write_cache: Dict[str, Any] = {} - self.read_cache: Dict[str, Any] = {} - self.clear_caches() - - def clear_caches(self): - self.write_cache = { - "content": dict(), - "content_early_in_rev": set(), - "content_in_dir": set(), - "directory": dict(), - "directory_in_rev": set(), - "revision": dict(), - "revision_before_rev": list(), - "revision_in_org": list(), - } - self.read_cache = {"content": dict(), "directory": dict(), "revision": dict()} - - def commit(self): + + def commit(self, data: Dict[str, Any], raise_on_commit: bool = False) -> bool: try: - self.insert_all() - self.clear_caches() + # First insert entities + self.insert_entity("content", data["content"]) + self.insert_entity("directory", data["directory"]) + self.insert_entity("revision", data["revision"]) + + # Relations should come after ids for entities were resolved + self.insert_relation( + "content", + "revision", + "content_early_in_rev", + data["content_early_in_rev"], + ) + self.insert_relation( + "content", "directory", "content_in_dir", data["content_in_dir"] + ) + self.insert_relation( + "directory", "revision", "directory_in_rev", data["directory_in_rev"] + ) + + # TODO: this should be updated when origin-revision layer gets properly + # updated. + # if data["revision_before_rev"]: + # psycopg2.extras.execute_values( + # self.cursor, + # """ + # LOCK TABLE ONLY revision_before_rev; + # INSERT INTO revision_before_rev VALUES %s + # ON CONFLICT DO NOTHING + # """, + # data["revision_before_rev"], + # ) + # data["revision_before_rev"].clear() + # + # if data["revision_in_org"]: + # psycopg2.extras.execute_values( + # self.cursor, + # """ + # LOCK TABLE ONLY revision_in_org; + # INSERT INTO revision_in_org VALUES %s + # ON CONFLICT DO NOTHING + # """, + # data["revision_in_org"], + # ) + # data["revision_in_org"].clear() + return True except: # noqa: E722 # Unexpected error occurred, rollback all changes and log message logging.exception("Unexpected error") - if self.raise_on_commit: + if raise_on_commit: raise return False - def content_get_early_date(self, blob: FileEntry) -> Optional[datetime]: - return self.get_dates("content", [blob.id]).get(blob.id, None) - - def content_get_early_dates( - self, blobs: Iterable[FileEntry] - ) -> Dict[bytes, datetime]: - return self.get_dates("content", [blob.id for blob in blobs]) - - def content_set_early_date(self, blob: FileEntry, date: datetime): - self.write_cache["content"][blob.id] = date - # update read cache as well - self.read_cache["content"][blob.id] = date - - def directory_get_date_in_isochrone_frontier( - self, directory: DirectoryEntry - ) -> Optional[datetime]: - return self.get_dates("directory", [directory.id]).get(directory.id, None) - - def directory_get_dates_in_isochrone_frontier( - self, dirs: Iterable[DirectoryEntry] - ) -> Dict[bytes, datetime]: - return self.get_dates("directory", [directory.id for directory in dirs]) - - def directory_set_date_in_isochrone_frontier( - self, directory: DirectoryEntry, date: datetime - ): - self.write_cache["directory"][directory.id] = date - # update read cache as well - self.read_cache["directory"][directory.id] = date - - def get_dates(self, table: str, ids: List[bytes]) -> Dict[bytes, datetime]: + def get_dates(self, entity: str, ids: List[bytes]) -> Dict[bytes, datetime]: dates = {} - pending = [] - for sha1 in ids: - # Check whether the date has been queried before - date = self.read_cache[table].get(sha1, None) - if date is not None: - dates[sha1] = date - else: - pending.append(sha1) - if pending: - # Otherwise, query the database and cache the values - values = ", ".join(itertools.repeat("%s", len(pending))) + if ids: + values = ", ".join(itertools.repeat("%s", len(ids))) self.cursor.execute( - f"""SELECT sha1, date FROM {table} WHERE sha1 IN ({values})""", - tuple(pending), + f"""SELECT sha1, date FROM {entity} WHERE sha1 IN ({values})""", + tuple(ids), ) - for sha1, date in self.cursor.fetchall(): - dates[sha1] = date - self.read_cache[table][sha1] = date + dates.update(self.cursor.fetchall()) return dates - def insert_entity(self, entity): - # Perform insertions with cached information - if self.write_cache[entity]: + def insert_entity(self, entity: str, data: Dict[bytes, datetime]): + if data: psycopg2.extras.execute_values( self.cursor, f""" LOCK TABLE ONLY {entity}; INSERT INTO {entity}(sha1, date) VALUES %s ON CONFLICT (sha1) DO UPDATE SET date=LEAST(EXCLUDED.date,{entity}.date) """, - self.write_cache[entity].items(), - ) - self.write_cache[entity].clear() - - def insert_all(self): - # First insert entities - self.insert_entity("content") - self.insert_entity("directory") - self.insert_entity("revision") - - # Relations should come after ids for entities were resolved - self.insert_relation("content", "revision", "content_early_in_rev") - self.insert_relation("content", "directory", "content_in_dir") - self.insert_relation("directory", "revision", "directory_in_rev") - - # TODO: this should be updated when origin-revision layer gets properly updated. - # if self.write_cache["revision_before_rev"]: - # psycopg2.extras.execute_values( - # self.cursor, - # """ - # LOCK TABLE ONLY revision_before_rev; - # INSERT INTO revision_before_rev VALUES %s - # ON CONFLICT DO NOTHING - # """, - # self.write_cache["revision_before_rev"], - # ) - # self.write_cache["revision_before_rev"].clear() - # - # if self.write_cache["revision_in_org"]: - # psycopg2.extras.execute_values( - # self.cursor, - # """ - # LOCK TABLE ONLY revision_in_org; - # INSERT INTO revision_in_org VALUES %s - # ON CONFLICT DO NOTHING - # """, - # self.write_cache["revision_in_org"], - # ) - # self.write_cache["revision_in_org"].clear() - - def origin_get_id(self, origin: OriginEntry) -> int: - if origin.id is None: - # Insert origin in the DB and return the assigned id - self.cursor.execute( - """ - LOCK TABLE ONLY origin; - INSERT INTO origin(url) VALUES (%s) - ON CONFLICT DO NOTHING - RETURNING id - """, - (origin.url,), + data.items(), ) - return self.cursor.fetchone()[0] - else: - return origin.id - - def revision_add(self, revision: RevisionEntry): - # Add current revision to the compact DB - self.write_cache["revision"][revision.id] = revision.date - # update read cache as well - self.read_cache["revision"][revision.id] = revision.date - - def revision_add_before_revision( - self, relative: RevisionEntry, revision: RevisionEntry + # XXX: not sure if Python takes a reference or a copy. + # This might be useless! + data.clear() + + def insert_relation( + self, src: str, dst: str, relation: str, data: Set[Tuple[bytes, bytes, bytes]] ): - self.write_cache["revision_before_rev"].append((revision.id, relative.id)) + ... - def revision_add_to_origin(self, origin: OriginEntry, revision: RevisionEntry): - self.write_cache["revision_in_org"].append((revision.id, origin.id)) + def content_find_first( + self, blob: bytes + ) -> Optional[Tuple[bytes, bytes, datetime, bytes]]: + ... - def revision_get_early_date(self, revision: RevisionEntry) -> Optional[datetime]: - return self.get_dates("revision", [revision.id]).get(revision.id, None) + def content_find_all( + self, blob: bytes, limit: Optional[int] = None + ) -> Generator[Tuple[bytes, bytes, datetime, bytes], None, None]: + ... - def revision_get_preferred_origin(self, revision: RevisionEntry) -> int: - # TODO: adapt this method to consider cached values + def origin_get_id(self, url: str) -> int: + # Insert origin in the DB and return the assigned id self.cursor.execute( - """SELECT COALESCE(org,0) FROM revision WHERE sha1=%s""", (revision.id,) + """ + LOCK TABLE ONLY origin; + INSERT INTO origin(url) VALUES (%s) + ON CONFLICT DO NOTHING + RETURNING id + """, + (url,), + ) + return self.cursor.fetchone()[0] + + def revision_get_preferred_origin(self, revision: bytes) -> int: + self.cursor.execute( + """SELECT COALESCE(org,0) FROM revision WHERE sha1=%s""", (revision,) ) row = self.cursor.fetchone() # None means revision is not in database; # 0 means revision has no preferred origin return row[0] if row is not None and row[0] != 0 else None - def revision_in_history(self, revision: RevisionEntry) -> bool: - # TODO: adapt this method to consider cached values + def revision_in_history(self, revision: bytes) -> bool: self.cursor.execute( """ SELECT 1 FROM revision_before_rev JOIN revision ON revision.id=revision_before_rev.prev WHERE revision.sha1=%s """, - (revision.id,), + (revision,), ) return self.cursor.fetchone() is not None - def revision_set_preferred_origin( - self, origin: OriginEntry, revision: RevisionEntry - ): - # TODO: adapt this method to consider cached values + def revision_set_preferred_origin(self, origin: int, revision: bytes): self.cursor.execute( - """UPDATE revision SET org=%s WHERE sha1=%s""", (origin.id, revision.id) + """UPDATE revision SET org=%s WHERE sha1=%s""", (origin, revision) ) - def revision_visited(self, revision: RevisionEntry) -> bool: - # TODO: adapt this method to consider cached values + def revision_visited(self, revision: bytes) -> bool: self.cursor.execute( """ SELECT 1 FROM revision_in_org JOIN revision ON revision.id=revision_in_org.rev WHERE revision.sha1=%s """, - (revision.id,), + (revision,), ) return self.cursor.fetchone() is not None diff --git a/swh/provenance/postgresql/provenancedb_with_path.py b/swh/provenance/postgresql/provenancedb_with_path.py index 1a7ea05..98e240c 100644 --- a/swh/provenance/postgresql/provenancedb_with_path.py +++ b/swh/provenance/postgresql/provenancedb_with_path.py @@ -1,158 +1,132 @@ from datetime import datetime -import os -from typing import Generator, Optional, Tuple +from typing import Generator, Optional, Set, Tuple import psycopg2 import psycopg2.extras -from ..model import DirectoryEntry, FileEntry -from ..revision import RevisionEntry from .provenancedb_base import ProvenanceDBBase -def normalize(path: bytes) -> bytes: - return path[2:] if path.startswith(bytes("." + os.path.sep, "utf-8")) else path - - class ProvenanceWithPathDB(ProvenanceDBBase): - def content_add_to_directory( - self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes - ): - self.write_cache["content_in_dir"].add( - (blob.id, directory.id, normalize(os.path.join(prefix, blob.name))) - ) - - def content_add_to_revision( - self, revision: RevisionEntry, blob: FileEntry, prefix: bytes - ): - self.write_cache["content_early_in_rev"].add( - (blob.id, revision.id, normalize(os.path.join(prefix, blob.name))) - ) - def content_find_first( - self, blobid: bytes + self, blob: bytes ) -> Optional[Tuple[bytes, bytes, datetime, bytes]]: self.cursor.execute( """ SELECT C.sha1 AS blob, R.sha1 AS rev, R.date AS date, L.path AS path FROM content AS C INNER JOIN content_early_in_rev AS CR ON (CR.blob = C.id) INNER JOIN location as L ON (CR.loc = L.id) INNER JOIN revision as R ON (CR.rev = R.id) WHERE C.sha1=%s ORDER BY date, rev, path ASC LIMIT 1 """, - (blobid,), + (blob,), ) return self.cursor.fetchone() def content_find_all( - self, blobid: bytes, limit: Optional[int] = None + self, blob: bytes, limit: Optional[int] = None ) -> Generator[Tuple[bytes, bytes, datetime, bytes], None, None]: early_cut = f"LIMIT {limit}" if limit is not None else "" self.cursor.execute( f""" (SELECT C.sha1 AS blob, R.sha1 AS rev, R.date AS date, L.path AS path FROM content AS C INNER JOIN content_early_in_rev AS CR ON (CR.blob = C.id) INNER JOIN location AS L ON (CR.loc = L.id) INNER JOIN revision AS R ON (CR.rev = R.id) WHERE C.sha1=%s) UNION (SELECT C.sha1 AS blob, R.sha1 AS rev, R.date AS date, CASE DL.path WHEN '' THEN CL.path WHEN '.' THEN CL.path ELSE (DL.path || '/' || CL.path)::unix_path END AS path FROM content AS C INNER JOIN content_in_dir AS CD ON (C.id = CD.blob) INNER JOIN directory_in_rev AS DR ON (CD.dir = DR.dir) INNER JOIN revision AS R ON (DR.rev = R.id) INNER JOIN location AS CL ON (CD.loc = CL.id) INNER JOIN location AS DL ON (DR.loc = DL.id) WHERE C.sha1=%s) ORDER BY date, rev, path {early_cut} """, - (blobid, blobid), + (blob, blob), ) # TODO: use POSTGRESQL EXPLAIN looking for query optimizations. yield from self.cursor.fetchall() - def directory_add_to_revision( - self, revision: RevisionEntry, directory: DirectoryEntry, path: bytes + def insert_relation( + self, src: str, dst: str, relation: str, data: Set[Tuple[bytes, bytes, bytes]] ): - self.write_cache["directory_in_rev"].add( - (directory.id, revision.id, normalize(path)) - ) - - def insert_relation(self, src, dst, relation): - """Insert entries in `relation` from the write_cache + """Insert entries in `relation` from `data` Also insert missing location entries in the 'location' table. """ - if self.write_cache[relation]: + if data: # TODO: find a better way of doing this; might be doable in a couple of # SQL queries (one to insert missing entries in the location' table, # one to insert entries in the relation) # Resolve src ids - src_sha1s = tuple(set(sha1 for (sha1, _, _) in self.write_cache[relation])) + src_sha1s = tuple(set(sha1 for (sha1, _, _) in data)) fmt = ",".join(["%s"] * len(src_sha1s)) self.cursor.execute( f"""SELECT sha1, id FROM {src} WHERE sha1 IN ({fmt})""", src_sha1s, ) src_values = dict(self.cursor.fetchall()) # Resolve dst ids - dst_sha1s = tuple(set(sha1 for (_, sha1, _) in self.write_cache[relation])) + dst_sha1s = tuple(set(sha1 for (_, sha1, _) in data)) fmt = ",".join(["%s"] * len(dst_sha1s)) self.cursor.execute( f"""SELECT sha1, id FROM {dst} WHERE sha1 IN ({fmt})""", dst_sha1s, ) dst_values = dict(self.cursor.fetchall()) # insert missing locations - locations = tuple(set((loc,) for (_, _, loc) in self.write_cache[relation])) + locations = tuple(set((loc,) for (_, _, loc) in data)) psycopg2.extras.execute_values( self.cursor, """ LOCK TABLE ONLY location; INSERT INTO location(path) VALUES %s ON CONFLICT (path) DO NOTHING """, locations, ) # fetch location ids fmt = ",".join(["%s"] * len(locations)) self.cursor.execute( f"SELECT path, id FROM location WHERE path IN ({fmt})", locations, ) loc_ids = dict(self.cursor.fetchall()) # Insert values in relation rows = [ (src_values[sha1_src], dst_values[sha1_dst], loc_ids[loc]) - for (sha1_src, sha1_dst, loc) in self.write_cache[relation] + for (sha1_src, sha1_dst, loc) in data ] psycopg2.extras.execute_values( self.cursor, f""" LOCK TABLE ONLY {relation}; INSERT INTO {relation} VALUES %s ON CONFLICT DO NOTHING """, rows, ) - self.write_cache[relation].clear() + data.clear() diff --git a/swh/provenance/postgresql/provenancedb_without_path.py b/swh/provenance/postgresql/provenancedb_without_path.py index 7c0fe78..7005348 100644 --- a/swh/provenance/postgresql/provenancedb_without_path.py +++ b/swh/provenance/postgresql/provenancedb_without_path.py @@ -1,141 +1,122 @@ from datetime import datetime import itertools import operator -from typing import Generator, Optional, Tuple +from typing import Generator, Optional, Set, Tuple import psycopg2 import psycopg2.extras -from ..model import DirectoryEntry, FileEntry -from ..revision import RevisionEntry from .provenancedb_base import ProvenanceDBBase ######################################################################################## ######################################################################################## ######################################################################################## class ProvenanceWithoutPathDB(ProvenanceDBBase): - def content_add_to_directory( - self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes - ): - self.write_cache["content_in_dir"].add((blob.id, directory.id)) - - def content_add_to_revision( - self, revision: RevisionEntry, blob: FileEntry, prefix: bytes - ): - self.write_cache["content_early_in_rev"].add((blob.id, revision.id)) - def content_find_first( - self, blobid: bytes + self, blob: bytes ) -> Optional[Tuple[bytes, bytes, datetime, bytes]]: self.cursor.execute( """ SELECT revision.sha1 AS rev, revision.date AS date FROM (SELECT content_early_in_rev.rev FROM content_early_in_rev JOIN content ON content.id=content_early_in_rev.blob WHERE content.sha1=%s ) AS content_in_rev JOIN revision ON revision.id=content_in_rev.rev ORDER BY date, rev ASC LIMIT 1 """, - (blobid,), + (blob,), ) row = self.cursor.fetchone() if row is not None: - # TODO: query revision from the archive and look for blobid into a + # TODO: query revision from the archive and look for blob into a # recursive directory_ls of the revision's root. - return blobid, row[0], row[1], b"" + return blob, row[0], row[1], b"" return None def content_find_all( - self, blobid: bytes, limit: Optional[int] = None + self, blob: bytes, limit: Optional[int] = None ) -> Generator[Tuple[bytes, bytes, datetime, bytes], None, None]: early_cut = f"LIMIT {limit}" if limit is not None else "" self.cursor.execute( f""" (SELECT revision.sha1 AS rev, revision.date AS date FROM (SELECT content_early_in_rev.rev FROM content_early_in_rev JOIN content ON content.id=content_early_in_rev.blob WHERE content.sha1=%s ) AS content_in_rev JOIN revision ON revision.id=content_in_rev.rev ) UNION (SELECT revision.sha1 AS rev, revision.date AS date FROM (SELECT directory_in_rev.rev FROM (SELECT content_in_dir.dir FROM content_in_dir JOIN content ON content_in_dir.blob=content.id WHERE content.sha1=%s ) AS content_dir JOIN directory_in_rev ON directory_in_rev.dir=content_dir.dir ) AS content_in_rev JOIN revision ON revision.id=content_in_rev.rev ) ORDER BY date, rev {early_cut} """, - (blobid, blobid), + (blob, blob), ) # TODO: use POSTGRESQL EXPLAIN looking for query optimizations. for row in self.cursor.fetchall(): - # TODO: query revision from the archive and look for blobid into a + # TODO: query revision from the archive and look for blob into a # recursive directory_ls of the revision's root. - yield blobid, row[0], row[1], b"" + yield blob, row[0], row[1], b"" - def directory_add_to_revision( - self, revision: RevisionEntry, directory: DirectoryEntry, path: bytes + def insert_relation( + self, src: str, dst: str, relation: str, data: Set[Tuple[bytes, bytes, bytes]] ): - self.write_cache["directory_in_rev"].add((directory.id, revision.id)) - - def insert_relation(self, src, dst, relation): - if self.write_cache[relation]: + if data: # Resolve src ids - src_values = dict().fromkeys( - map(operator.itemgetter(0), self.write_cache[relation]) - ) + src_values = dict().fromkeys(map(operator.itemgetter(0), data)) values = ", ".join(itertools.repeat("%s", len(src_values))) self.cursor.execute( f"""SELECT sha1, id FROM {src} WHERE sha1 IN ({values})""", tuple(src_values), ) src_values = dict(self.cursor.fetchall()) # Resolve dst ids - dst_values = dict().fromkeys( - map(operator.itemgetter(1), self.write_cache[relation]) - ) + dst_values = dict().fromkeys(map(operator.itemgetter(1), data)) values = ", ".join(itertools.repeat("%s", len(dst_values))) self.cursor.execute( f"""SELECT sha1, id FROM {dst} WHERE sha1 IN ({values})""", tuple(dst_values), ) dst_values = dict(self.cursor.fetchall()) # Insert values in relation rows = map( lambda row: (src_values[row[0]], dst_values[row[1]]), - self.write_cache[relation], + data, ) psycopg2.extras.execute_values( self.cursor, f""" LOCK TABLE ONLY {relation}; INSERT INTO {relation} VALUES %s ON CONFLICT DO NOTHING """, rows, ) - self.write_cache[relation].clear() + data.clear() diff --git a/swh/provenance/provenance.py b/swh/provenance/provenance.py index b8169ce..ab30f8c 100644 --- a/swh/provenance/provenance.py +++ b/swh/provenance/provenance.py @@ -1,603 +1,766 @@ from collections import Counter from datetime import datetime, timezone import logging import os import time -from typing import Dict, Generator, Iterable, List, Optional, Tuple +from typing import Any, Dict, Generator, Iterable, List, Optional, Tuple +import psycopg2 from typing_extensions import Protocol, runtime_checkable from swh.model.hashutil import hash_to_hex from .archive import ArchiveInterface from .model import DirectoryEntry, FileEntry, OriginEntry, RevisionEntry UTCMIN = datetime.min.replace(tzinfo=timezone.utc) +# XXX: this protocol doesn't make much sense now that flavours have been delegated to +# another class, lower in the callstack. @runtime_checkable class ProvenanceInterface(Protocol): raise_on_commit: bool = False def commit(self): """Commit currently ongoing transactions in the backend DB""" ... def content_add_to_directory( self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes ) -> None: ... def content_add_to_revision( self, revision: RevisionEntry, blob: FileEntry, prefix: bytes ) -> None: ... def content_find_first( - self, blobid: bytes + self, blob: bytes ) -> Optional[Tuple[bytes, bytes, datetime, bytes]]: ... def content_find_all( - self, blobid: bytes, limit: Optional[int] = None + self, blob: bytes, limit: Optional[int] = None ) -> Generator[Tuple[bytes, bytes, datetime, bytes], None, None]: ... def content_get_early_date(self, blob: FileEntry) -> Optional[datetime]: ... def content_get_early_dates( self, blobs: Iterable[FileEntry] ) -> Dict[bytes, datetime]: ... def content_set_early_date(self, blob: FileEntry, date: datetime) -> None: ... def directory_add_to_revision( self, revision: RevisionEntry, directory: DirectoryEntry, path: bytes ) -> None: ... def directory_get_date_in_isochrone_frontier( self, directory: DirectoryEntry ) -> Optional[datetime]: ... def directory_get_dates_in_isochrone_frontier( self, dirs: Iterable[DirectoryEntry] ) -> Dict[bytes, datetime]: ... def directory_set_date_in_isochrone_frontier( self, directory: DirectoryEntry, date: datetime ) -> None: ... def origin_get_id(self, origin: OriginEntry) -> int: ... def revision_add(self, revision: RevisionEntry) -> None: ... def revision_add_before_revision( self, relative: RevisionEntry, revision: RevisionEntry ) -> None: ... def revision_add_to_origin( self, origin: OriginEntry, revision: RevisionEntry ) -> None: ... def revision_get_early_date(self, revision: RevisionEntry) -> Optional[datetime]: ... def revision_get_preferred_origin(self, revision: RevisionEntry) -> int: ... def revision_in_history(self, revision: RevisionEntry) -> bool: ... def revision_set_preferred_origin( self, origin: OriginEntry, revision: RevisionEntry ) -> None: ... def revision_visited(self, revision: RevisionEntry) -> bool: ... +# TODO: maybe move this to a separate file +class ProvenanceBackend: + raise_on_commit: bool = False + + def __init__(self, conn: psycopg2.extensions.connection, with_path: bool = True): + from .postgresql.provenancedb_base import ProvenanceDBBase + + # TODO: this class should not know what the actual used DB is. + self.storage: ProvenanceDBBase + if with_path: + from .postgresql.provenancedb_with_path import ProvenanceWithPathDB + + self.storage = ProvenanceWithPathDB(conn) + else: + from .postgresql.provenancedb_without_path import ProvenanceWithoutPathDB + + self.storage = ProvenanceWithoutPathDB(conn) + + self.write_cache: Dict[str, Any] = {} + self.read_cache: Dict[str, Any] = {} + self.clear_caches() + + def clear_caches(self): + self.write_cache = { + "content": dict(), + "content_early_in_rev": set(), + "content_in_dir": set(), + "directory": dict(), + "directory_in_rev": set(), + "revision": dict(), + "revision_before_rev": list(), + "revision_in_org": list(), + } + self.read_cache = {"content": dict(), "directory": dict(), "revision": dict()} + + def commit(self): + # TODO: for now we just forward the write_cache. This should be improved! + while not self.storage.commit( + self.write_cache, raise_on_commit=self.raise_on_commit + ): + logging.warning( + f"Unable to commit cached information {self.write_cache}. Retrying..." + ) + self.clear_caches() + + def content_add_to_directory( + self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes + ): + self.write_cache["content_in_dir"].add( + (blob.id, directory.id, normalize(os.path.join(prefix, blob.name))) + ) + + def content_add_to_revision( + self, revision: RevisionEntry, blob: FileEntry, prefix: bytes + ): + self.write_cache["content_early_in_rev"].add( + (blob.id, revision.id, normalize(os.path.join(prefix, blob.name))) + ) + + def content_find_first( + self, blob: bytes + ) -> Optional[Tuple[bytes, bytes, datetime, bytes]]: + return self.storage.content_find_first(blob) + + def content_find_all( + self, blob: bytes, limit: Optional[int] = None + ) -> Generator[Tuple[bytes, bytes, datetime, bytes], None, None]: + yield from self.storage.content_find_all(blob, limit=limit) + + def content_get_early_date(self, blob: FileEntry) -> Optional[datetime]: + return self.get_dates("content", [blob.id]).get(blob.id, None) + + def content_get_early_dates( + self, blobs: Iterable[FileEntry] + ) -> Dict[bytes, datetime]: + return self.get_dates("content", [blob.id for blob in blobs]) + + def content_set_early_date(self, blob: FileEntry, date: datetime): + self.write_cache["content"][blob.id] = date + # update read cache as well + self.read_cache["content"][blob.id] = date + + def directory_add_to_revision( + self, revision: RevisionEntry, directory: DirectoryEntry, path: bytes + ): + self.write_cache["directory_in_rev"].add( + (directory.id, revision.id, normalize(path)) + ) + + def directory_get_date_in_isochrone_frontier( + self, directory: DirectoryEntry + ) -> Optional[datetime]: + return self.get_dates("directory", [directory.id]).get(directory.id, None) + + def directory_get_dates_in_isochrone_frontier( + self, dirs: Iterable[DirectoryEntry] + ) -> Dict[bytes, datetime]: + return self.get_dates("directory", [directory.id for directory in dirs]) + + def directory_set_date_in_isochrone_frontier( + self, directory: DirectoryEntry, date: datetime + ): + self.write_cache["directory"][directory.id] = date + # update read cache as well + self.read_cache["directory"][directory.id] = date + + def get_dates(self, entity: str, ids: List[bytes]) -> Dict[bytes, datetime]: + dates = {} + pending = [] + for sha1 in ids: + # Check whether the date has been queried before + date = self.read_cache[entity].get(sha1, None) + if date is not None: + dates[sha1] = date + else: + pending.append(sha1) + dates.update(self.storage.get_dates(entity, pending)) + return dates + + def origin_get_id(self, origin: OriginEntry) -> int: + if origin.id is None: + return self.storage.origin_get_id(origin.url) + else: + return origin.id + + def revision_add(self, revision: RevisionEntry): + # Add current revision to the compact DB + self.write_cache["revision"][revision.id] = revision.date + # update read cache as well + self.read_cache["revision"][revision.id] = revision.date + + def revision_add_before_revision( + self, relative: RevisionEntry, revision: RevisionEntry + ): + self.write_cache["revision_before_rev"].append((revision.id, relative.id)) + + def revision_add_to_origin(self, origin: OriginEntry, revision: RevisionEntry): + self.write_cache["revision_in_org"].append((revision.id, origin.id)) + + def revision_get_early_date(self, revision: RevisionEntry) -> Optional[datetime]: + return self.get_dates("revision", [revision.id]).get(revision.id, None) + + def revision_get_preferred_origin(self, revision: RevisionEntry) -> int: + # TODO: adapt this method to consider cached values + return self.storage.revision_get_preferred_origin(revision.id) + + def revision_in_history(self, revision: RevisionEntry) -> bool: + # TODO: adapt this method to consider cached values + return self.storage.revision_in_history(revision.id) + + def revision_set_preferred_origin( + self, origin: OriginEntry, revision: RevisionEntry + ): + assert origin.id is not None + # TODO: adapt this method to consider cached values + self.storage.revision_set_preferred_origin(origin.id, revision.id) + + def revision_visited(self, revision: RevisionEntry) -> bool: + # TODO: adapt this method to consider cached values + return self.storage.revision_visited(revision.id) + + +def normalize(path: bytes) -> bytes: + return path[2:] if path.startswith(bytes("." + os.path.sep, "utf-8")) else path + + def flatten_directory( archive: ArchiveInterface, provenance: ProvenanceInterface, directory: DirectoryEntry, ) -> None: """Recursively retrieve all the files of 'directory' and insert them in the 'provenance' database in the 'content_to_directory' table. """ stack = [(directory, b"")] while stack: current, prefix = stack.pop() current.retrieve_children(archive) for f_child in current.files: # Add content to the directory with the computed prefix. provenance.content_add_to_directory(directory, f_child, prefix) for d_child in current.dirs: # Recursively walk the child directory. stack.append((d_child, os.path.join(prefix, d_child.name))) def origin_add( provenance: ProvenanceInterface, archive: ArchiveInterface, origins: List[OriginEntry], ) -> None: start = time.time() for origin in origins: origin.retrieve_revisions(archive) for revision in origin.revisions: origin_add_revision(provenance, archive, origin, revision) done = time.time() provenance.commit() stop = time.time() logging.debug( "Origins " ";".join( [origin.url + ":" + hash_to_hex(origin.snapshot) for origin in origins] ) + f" were processed in {stop - start} secs (commit took {stop - done} secs)!" ) def origin_add_revision( provenance: ProvenanceInterface, archive: ArchiveInterface, origin: OriginEntry, revision: RevisionEntry, ) -> None: stack: List[Tuple[Optional[RevisionEntry], RevisionEntry]] = [(None, revision)] + origin.id = provenance.origin_get_id(origin) while stack: relative, current = stack.pop() # Check if current revision has no preferred origin and update if necessary. preferred = provenance.revision_get_preferred_origin(current) if preferred is None: provenance.revision_set_preferred_origin(origin, current) ######################################################################## if relative is None: # This revision is pointed directly by the origin. visited = provenance.revision_visited(current) provenance.revision_add_to_origin(origin, current) if not visited: stack.append((current, current)) else: # This revision is a parent of another one in the history of the # relative revision. for parent in current.parents(archive): visited = provenance.revision_visited(parent) if not visited: # The parent revision has never been seen before pointing # directly to an origin. known = provenance.revision_in_history(parent) if known: # The parent revision is already known in some other # revision's history. We should point it directly to # the origin and (eventually) walk its history. stack.append((None, parent)) else: # The parent revision was never seen before. We should # walk its history and associate it with the same # relative revision. provenance.revision_add_before_revision(relative, parent) stack.append((relative, parent)) else: # The parent revision already points to an origin, so its # history was properly processed before. We just need to # make sure it points to the current origin as well. provenance.revision_add_to_origin(origin, parent) def revision_add( provenance: ProvenanceInterface, archive: ArchiveInterface, revisions: List[RevisionEntry], trackall: bool = True, lower: bool = True, mindepth: int = 1, commit: bool = True, ) -> None: start = time.time() for revision in revisions: assert revision.date is not None assert revision.root is not None # Processed content starting from the revision's root directory. date = provenance.revision_get_early_date(revision) if date is None or revision.date < date: logging.debug( f"Processing revisions {hash_to_hex(revision.id)}" f" (known date {date} / revision date {revision.date})..." ) graph = build_isochrone_graph( archive, provenance, revision, DirectoryEntry(revision.root), ) # TODO: add file size filtering revision_process_content( archive, provenance, revision, graph, trackall=trackall, lower=lower, mindepth=mindepth, ) done = time.time() if commit: - # TODO: improve this! Maybe using a max attempt counter? - # Ideally Provenance class should guarantee that a commit never fails. - while not provenance.commit(): - logging.warning( - "Could not commit revisions " - + ";".join([hash_to_hex(revision.id) for revision in revisions]) - + ". Retrying..." - ) + provenance.commit() stop = time.time() logging.debug( f"Revisions {';'.join([hash_to_hex(revision.id) for revision in revisions])} " f" were processed in {stop - start} secs (commit took {stop - done} secs)!" ) # logging.critical( # ";".join([hash_to_hex(revision.id) for revision in revisions]) # + f",{stop - start},{stop - done}" # ) class IsochroneNode: def __init__( self, entry: DirectoryEntry, dbdate: Optional[datetime] = None, depth: int = 0, prefix: bytes = b"", ): self.entry = entry self.depth = depth # dbdate is the maxdate for this node that comes from the DB self._dbdate: Optional[datetime] = dbdate # maxdate is set by the maxdate computation algorithm self.maxdate: Optional[datetime] = None # known is True if this node is already known in the db; either because # the current directory actually exists in the database, or because all # the content of the current directory is known (subdirectories and files) self.known = self.dbdate is not None self.invalid = False self.path = os.path.join(prefix, self.entry.name) if prefix else self.entry.name self.children: List[IsochroneNode] = [] @property def dbdate(self): # use a property to make this attribute (mostly) read-only return self._dbdate def invalidate(self): self._dbdate = None self.maxdate = None self.known = False self.invalid = True def add_directory( self, child: DirectoryEntry, date: Optional[datetime] = None ) -> "IsochroneNode": # we should not be processing this node (ie add subdirectories or # files) if it's actually known by the provenance DB assert self.dbdate is None node = IsochroneNode(child, dbdate=date, depth=self.depth + 1, prefix=self.path) self.children.append(node) return node def __str__(self): return ( f"<{self.entry}: dbdate={self.dbdate}, maxdate={self.maxdate}, " f"known={self.known}, invalid={self.invalid}, path={self.path}, " f"children=[{', '.join(str(child) for child in self.children)}]>" ) def __eq__(self, other): return ( isinstance(other, IsochroneNode) and ( self.entry, self.depth, self._dbdate, self.maxdate, self.known, self.invalid, self.path, ) == ( other.entry, other.depth, other._dbdate, other.maxdate, other.known, other.invalid, other.path, ) and Counter(self.children) == Counter(other.children) ) def __hash__(self): return hash( ( self.entry, self.depth, self._dbdate, self.maxdate, self.known, self.invalid, self.path, ) ) def build_isochrone_graph( archive: ArchiveInterface, provenance: ProvenanceInterface, revision: RevisionEntry, directory: DirectoryEntry, ) -> IsochroneNode: assert revision.date is not None assert revision.root == directory.id # this function process a revision in 2 steps: # # 1. build the tree structure of IsochroneNode objects (one INode per # directory under the root directory of the revision but not following # known subdirectories), and gather the dates from the DB for already # known objects; for files, just keep all the dates in a global 'fdates' # dict; note that in this step, we will only recurse the directories # that are not already known. # # 2. compute the maxdate for each node of the tree that was not found in the DB. # Build the nodes structure root_date = provenance.directory_get_date_in_isochrone_frontier(directory) root = IsochroneNode(directory, dbdate=root_date) stack = [root] logging.debug( f"Recursively creating graph for revision {hash_to_hex(revision.id)}..." ) fdates: Dict[bytes, datetime] = {} # map {file_id: date} while stack: current = stack.pop() if current.dbdate is None or current.dbdate > revision.date: # If current directory has an associated date in the isochrone frontier that # is greater or equal to the current revision's one, it should be ignored as # the revision is being processed out of order. if current.dbdate is not None and current.dbdate > revision.date: logging.debug( f"Invalidating frontier on {hash_to_hex(current.entry.id)}" f" (date {current.dbdate})" f" when processing revision {hash_to_hex(revision.id)}" f" (date {revision.date})" ) current.invalidate() # Pre-query all known dates for directories in the current directory # for the provenance object to have them cached and (potentially) improve # performance. current.entry.retrieve_children(archive) ddates = provenance.directory_get_dates_in_isochrone_frontier( current.entry.dirs ) for dir in current.entry.dirs: # Recursively analyse subdirectory nodes node = current.add_directory(dir, date=ddates.get(dir.id, None)) stack.append(node) fdates.update(provenance.content_get_early_dates(current.entry.files)) logging.debug( f"Isochrone graph for revision {hash_to_hex(revision.id)} successfully created!" ) # Precalculate max known date for each node in the graph (only directory nodes are # pushed to the stack). logging.debug(f"Computing maxdates for revision {hash_to_hex(revision.id)}...") stack = [root] while stack: current = stack.pop() # Current directory node is known if it already has an assigned date (ie. it was # already seen as an isochrone frontier). if current.known: assert current.maxdate is None current.maxdate = current.dbdate else: if any(x.maxdate is None for x in current.children): # at least one child of current has no maxdate yet # Current node needs to be analysed again after its children. stack.append(current) for child in current.children: if child.maxdate is None: # if child.maxdate is None, it must be processed stack.append(child) else: # all the files and directories under current have a maxdate, # we can infer the maxdate for current directory assert current.maxdate is None # if all content is already known, update current directory info. current.maxdate = max( [UTCMIN] + [ child.maxdate for child in current.children if child.maxdate is not None # unnecessary, but needed for mypy ] + [ fdates.get(file.id, revision.date) for file in current.entry.files ] ) if current.maxdate <= revision.date: current.known = ( # true if all subdirectories are known all(child.known for child in current.children) # true if all files are in fdates, i.e. if all files were known # *before building this isochrone graph node* # Note: the 'all()' is lazy: will stop iterating as soon as # possible and all((file.id in fdates) for file in current.entry.files) ) else: # at least one content is being processed out-of-order, then current # node should be treated as unknown current.maxdate = revision.date current.known = False logging.debug( f"Maxdates for revision {hash_to_hex(revision.id)} successfully computed!" ) return root def revision_process_content( archive: ArchiveInterface, provenance: ProvenanceInterface, revision: RevisionEntry, graph: IsochroneNode, trackall: bool = True, lower: bool = True, mindepth: int = 1, ): assert revision.date is not None provenance.revision_add(revision) stack = [graph] while stack: current = stack.pop() if current.dbdate is not None: assert current.dbdate <= revision.date if trackall: # Current directory is an outer isochrone frontier for a previously # processed revision. It should be reused as is. provenance.directory_add_to_revision( revision, current.entry, current.path ) else: assert current.maxdate is not None # Current directory is not an outer isochrone frontier for any previous # revision. It might be eligible for this one. if is_new_frontier( current, revision=revision, trackall=trackall, lower=lower, mindepth=mindepth, ): # Outer frontier should be moved to current position in the isochrone # graph. This is the first time this directory is found in the isochrone # frontier. provenance.directory_set_date_in_isochrone_frontier( current.entry, current.maxdate ) if trackall: provenance.directory_add_to_revision( revision, current.entry, current.path ) flatten_directory(archive, provenance, current.entry) else: # If current node is an invalidated frontier, update its date for future # revisions to get the proper value. if current.invalid: provenance.directory_set_date_in_isochrone_frontier( current.entry, current.maxdate ) # No point moving the frontier here. Either there are no files or they # are being seen for the first time here. Add all blobs to current # revision updating date if necessary, and recursively analyse # subdirectories as candidates to the outer frontier. for blob in current.entry.files: date = provenance.content_get_early_date(blob) if date is None or revision.date < date: provenance.content_set_early_date(blob, revision.date) provenance.content_add_to_revision(revision, blob, current.path) for child in current.children: stack.append(child) def is_new_frontier( node: IsochroneNode, revision: RevisionEntry, trackall: bool = True, lower: bool = True, mindepth: int = 1, ) -> bool: assert node.maxdate is not None # for mypy assert revision.date is not None # idem if trackall: # The only real condition for a directory to be a frontier is that its # content is already known and its maxdate is less (or equal) than # current revision's date. Checking mindepth is meant to skip root # directories (or any arbitrary depth) to improve the result. The # option lower tries to maximize the reusage rate of previously defined # frontiers by keeping them low in the directory tree. return ( node.known and node.maxdate <= revision.date # all content is earlier than revision and node.depth >= mindepth # current node is deeper than the min allowed depth and (has_blobs(node) if lower else True) # there is at least one blob in it ) else: # If we are only tracking first occurrences, we want to ensure that all first # occurrences end up in the content_early_in_rev relation. Thus, we force for # every blob outside a frontier to have an extrictly earlier date. return ( node.maxdate < revision.date # all content is earlier than revision and node.depth >= mindepth # deeper than the min allowed depth and (has_blobs(node) if lower else True) # there is at least one blob ) def has_blobs(node: IsochroneNode) -> bool: # We may want to look for files in different ways to decide whether to define a # frontier or not: # 1. Only files in current node: return any(node.entry.files) # 2. Files anywhere in the isochrone graph # stack = [node] # while stack: # current = stack.pop() # if any( # map(lambda child: isinstance(child.entry, FileEntry), current.children)): # return True # else: # # All children are directory entries. # stack.extend(current.children) # return False # 3. Files in the intermediate directories between current node and any previously # defined frontier: # TODO: complete this case! # return any( # map(lambda child: isinstance(child.entry, FileEntry), node.children) # ) or all( # map( # lambda child: ( # not (isinstance(child.entry, DirectoryEntry) and child.date is None) # ) # or has_blobs(child), # node.children, # ) # ) diff --git a/swh/provenance/tests/conftest.py b/swh/provenance/tests/conftest.py index ba8b7fb..62c49cd 100644 --- a/swh/provenance/tests/conftest.py +++ b/swh/provenance/tests/conftest.py @@ -1,254 +1,252 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import glob from os import path import re from typing import Iterable, Iterator, List, Optional import pytest from typing_extensions import TypedDict from swh.core.api.serializers import msgpack_loads from swh.core.db import BaseDb from swh.core.db.pytest_plugin import postgresql_fact from swh.core.utils import numfile_sortkey as sortkey from swh.model.model import Content, Directory, DirectoryEntry, Revision from swh.model.tests.swh_model_data import TEST_OBJECTS import swh.provenance from swh.provenance.postgresql.archive import ArchivePostgreSQL from swh.provenance.storage.archive import ArchiveStorage SQL_DIR = path.join(path.dirname(swh.provenance.__file__), "sql") SQL_FILES = [ sqlfile for sqlfile in sorted(glob.glob(path.join(SQL_DIR, "*.sql")), key=sortkey) if "-without-path-" not in sqlfile ] provenance_db = postgresql_fact( "postgresql_proc", dbname="provenance", dump_files=SQL_FILES ) @pytest.fixture def provenance(provenance_db): """return a working and initialized provenance db""" - from swh.provenance.postgresql.provenancedb_with_path import ( - ProvenanceWithPathDB as ProvenanceDB, - ) + from swh.provenance.provenance import ProvenanceBackend BaseDb.adapt_conn(provenance_db) - prov = ProvenanceDB(provenance_db) + prov = ProvenanceBackend(provenance_db) # in test sessions, we DO want to raise any exception occurring at commit time prov.raise_on_commit = True return prov @pytest.fixture def swh_storage_with_objects(swh_storage): """return a Storage object (postgresql-based by default) with a few of each object type in it The inserted content comes from swh.model.tests.swh_model_data. """ for obj_type in ( "content", "skipped_content", "directory", "revision", "release", "snapshot", "origin", "origin_visit", "origin_visit_status", ): getattr(swh_storage, f"{obj_type}_add")(TEST_OBJECTS[obj_type]) return swh_storage @pytest.fixture def archive_direct(swh_storage_with_objects): return ArchivePostgreSQL(swh_storage_with_objects.get_db().conn) @pytest.fixture def archive_api(swh_storage_with_objects): return ArchiveStorage(swh_storage_with_objects) @pytest.fixture(params=["archive", "db"]) def archive(request, swh_storage_with_objects): """Return a ArchivePostgreSQL based StorageInterface object""" # this is a workaround to prevent tests from hanging because of an unclosed # transaction. # TODO: refactor the ArchivePostgreSQL to properly deal with # transactions and get rif of this fixture if request.param == "db": archive = ArchivePostgreSQL(conn=swh_storage_with_objects.get_db().conn) yield archive archive.conn.rollback() else: yield ArchiveStorage(swh_storage_with_objects) def get_datafile(fname): return path.join(path.dirname(__file__), "data", fname) def load_repo_data(repo): data = {"revision": [], "directory": [], "content": []} with open(get_datafile(f"{repo}.msgpack"), "rb") as fobj: for etype, value in msgpack_loads(fobj.read()): data[etype].append(value) return data def filter_dict(d, keys): return {k: v for (k, v) in d.items() if k in keys} def fill_storage(storage, data): storage.content_add_metadata( Content.from_dict(content) for content in data["content"] ) storage.directory_add( [ Directory( entries=tuple( [ DirectoryEntry.from_dict( filter_dict(entry, ("name", "type", "target", "perms")) ) for entry in dir["entries"] ] ) ) for dir in data["directory"] ] ) storage.revision_add(Revision.from_dict(revision) for revision in data["revision"]) class SynthRelation(TypedDict): prefix: Optional[str] path: str src: bytes dst: bytes rel_ts: float class SynthRevision(TypedDict): sha1: bytes date: float msg: str R_C: List[SynthRelation] R_D: List[SynthRelation] D_C: List[SynthRelation] def synthetic_result(filename: str) -> Iterator[SynthRevision]: """Generates dict representations of synthetic revisions found in the synthetic file (from the data/ directory) given as argument of the generator. Generated SynthRevision (typed dict) with the following elements: "sha1": (bytes) sha1 of the revision, "date": (float) timestamp of the revision, "msg": (str) commit message of the revision, "R_C": (list) new R---C relations added by this revision "R_D": (list) new R-D relations added by this revision "D_C": (list) new D-C relations added by this revision Each relation above is a SynthRelation typed dict with: "path": (str) location "src": (bytes) sha1 of the source of the relation "dst": (bytes) sha1 of the destination of the relation "rel_ts": (float) timestamp of the target of the relation (related to the timestamp of the revision) """ with open(get_datafile(filename), "r") as fobj: yield from _parse_synthetic_file(fobj) def _parse_synthetic_file(fobj: Iterable[str]) -> Iterator[SynthRevision]: """Read a 'synthetic' file and generate a dict representation of the synthetic revision for each revision listed in the synthetic file. """ regs = [ "(?PR[0-9]{2,4})?", "(?P[^| ]*)", "([+] )?(?P[^| +]*?)[/]?", "(?P[RDC]) (?P[0-9a-z]{40})", "(?P-?[0-9]+(.[0-9]+)?)", ] regex = re.compile("^ *" + r" *[|] *".join(regs) + r" *(#.*)?$") current_rev: List[dict] = [] for m in (regex.match(line) for line in fobj): if m: d = m.groupdict() if d["revname"]: if current_rev: yield _mk_synth_rev(current_rev) current_rev.clear() current_rev.append(d) if current_rev: yield _mk_synth_rev(current_rev) def _mk_synth_rev(synth_rev) -> SynthRevision: assert synth_rev[0]["type"] == "R" rev = SynthRevision( sha1=bytes.fromhex(synth_rev[0]["sha1"]), date=float(synth_rev[0]["ts"]), msg=synth_rev[0]["revname"], R_C=[], R_D=[], D_C=[], ) current_path = None # path of the last R-D relation we parsed, used a prefix for next D-C # relations for row in synth_rev[1:]: if row["reltype"] == "R---C": assert row["type"] == "C" rev["R_C"].append( SynthRelation( prefix=None, path=row["path"], src=rev["sha1"], dst=bytes.fromhex(row["sha1"]), rel_ts=float(row["ts"]), ) ) current_path = None elif row["reltype"] == "R-D": assert row["type"] == "D" rev["R_D"].append( SynthRelation( prefix=None, path=row["path"], src=rev["sha1"], dst=bytes.fromhex(row["sha1"]), rel_ts=float(row["ts"]), ) ) current_path = row["path"] elif row["reltype"] == "D-C": assert row["type"] == "C" rev["D_C"].append( SynthRelation( prefix=current_path, path=row["path"], src=rev["R_D"][-1]["dst"], dst=bytes.fromhex(row["sha1"]), rel_ts=float(row["ts"]), ) ) return rev diff --git a/swh/provenance/tests/test_conftest.py b/swh/provenance/tests/test_conftest.py index f45fe0b..f0725ec 100644 --- a/swh/provenance/tests/test_conftest.py +++ b/swh/provenance/tests/test_conftest.py @@ -1,19 +1,19 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information def test_provenance_fixture(provenance): """Check the 'provenance' fixture produce a working ProvenanceDB object""" assert provenance - provenance.insert_all() # should be a noop + provenance.commit() # should be a noop def test_storage(swh_storage_with_objects): """Check the 'swh_storage_with_objects' fixture produce a working Storage object with at least some Content, Revision and Directory in it""" assert swh_storage_with_objects assert swh_storage_with_objects.content_get_random() assert swh_storage_with_objects.directory_get_random() assert swh_storage_with_objects.revision_get_random() diff --git a/swh/provenance/tests/test_provenance_heuristics.py b/swh/provenance/tests/test_provenance_heuristics.py index 10ab31b..c55005c 100644 --- a/swh/provenance/tests/test_provenance_heuristics.py +++ b/swh/provenance/tests/test_provenance_heuristics.py @@ -1,321 +1,325 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Dict, List, Tuple import pytest from swh.provenance.model import RevisionEntry from swh.provenance.provenance import revision_add from swh.provenance.tests.conftest import ( fill_storage, get_datafile, load_repo_data, synthetic_result, ) from swh.provenance.tests.test_provenance_db import ts2dt def sha1s(cur, table): """return the 'sha1' column from the DB 'table' (as hex) 'cur' is a cursor to the provenance index DB. """ cur.execute(f"SELECT sha1 FROM {table}") return set(sha1.hex() for (sha1,) in cur.fetchall()) def locations(cur): """return the 'path' column from the DB location table 'cur' is a cursor to the provenance index DB. """ cur.execute("SELECT encode(location.path::bytea, 'escape') FROM location") return set(x for (x,) in cur.fetchall()) def relations(cur, src, dst): """return the triplets ('sha1', 'sha1', 'path') from the DB for the relation between 'src' table and 'dst' table (i.e. for C-R, C-D and D-R relations). 'cur' is a cursor to the provenance index DB. """ relation = { ("content", "revision"): "content_early_in_rev", ("content", "directory"): "content_in_dir", ("directory", "revision"): "directory_in_rev", }[(src, dst)] srccol = {"content": "blob", "directory": "dir"}[src] dstcol = {"directory": "dir", "revision": "rev"}[dst] cur.execute( f"SELECT encode(src.sha1::bytea, 'hex')," f" encode(dst.sha1::bytea, 'hex')," f" encode(location.path::bytea, 'escape') " f"FROM {relation} as rel, " f" {src} as src, {dst} as dst, location " f"WHERE rel.{srccol}=src.id AND rel.{dstcol}=dst.id AND rel.loc=location.id" ) return set(cur.fetchall()) def get_timestamp(cur, table, sha1): """return the date for the 'sha1' from the DB 'table' (as hex) 'cur' is a cursor to the provenance index DB. """ if isinstance(sha1, str): sha1 = bytes.fromhex(sha1) cur.execute(f"SELECT date FROM {table} WHERE sha1=%s", (sha1,)) return [date.timestamp() for (date,) in cur.fetchall()] @pytest.mark.parametrize( "repo, lower, mindepth", ( ("cmdbts2", True, 1), ("cmdbts2", False, 1), ("cmdbts2", True, 2), ("cmdbts2", False, 2), ("out-of-order", True, 1), ), ) def test_provenance_heuristics(provenance, swh_storage, archive, repo, lower, mindepth): # read data/README.md for more details on how these datasets are generated data = load_repo_data(repo) fill_storage(swh_storage, data) syntheticfile = get_datafile( f"synthetic_{repo}_{'lower' if lower else 'upper'}_{mindepth}.txt" ) revisions = {rev["id"]: rev for rev in data["revision"]} rows = { "content": set(), "content_in_dir": set(), "content_early_in_rev": set(), "directory": set(), "directory_in_rev": set(), "location": set(), "revision": set(), } for synth_rev in synthetic_result(syntheticfile): revision = revisions[synth_rev["sha1"]] entry = RevisionEntry( id=revision["id"], date=ts2dt(revision["date"]), root=revision["directory"], ) revision_add(provenance, archive, [entry], lower=lower, mindepth=mindepth) # each "entry" in the synth file is one new revision rows["revision"].add(synth_rev["sha1"].hex()) - assert rows["revision"] == sha1s(provenance.cursor, "revision"), synth_rev[ - "msg" - ] + assert rows["revision"] == sha1s( + provenance.storage.cursor, "revision" + ), synth_rev["msg"] # check the timestamp of the revision rev_ts = synth_rev["date"] assert get_timestamp( - provenance.cursor, "revision", synth_rev["sha1"].hex() + provenance.storage.cursor, "revision", synth_rev["sha1"].hex() ) == [rev_ts], synth_rev["msg"] # this revision might have added new content objects rows["content"] |= set(x["dst"].hex() for x in synth_rev["R_C"]) rows["content"] |= set(x["dst"].hex() for x in synth_rev["D_C"]) - assert rows["content"] == sha1s(provenance.cursor, "content"), synth_rev["msg"] + assert rows["content"] == sha1s( + provenance.storage.cursor, "content" + ), synth_rev["msg"] # check for R-C (direct) entries # these are added directly in the content_early_in_rev table rows["content_early_in_rev"] |= set( (x["dst"].hex(), x["src"].hex(), x["path"]) for x in synth_rev["R_C"] ) assert rows["content_early_in_rev"] == relations( - provenance.cursor, "content", "revision" + provenance.storage.cursor, "content", "revision" ), synth_rev["msg"] # check timestamps for rc in synth_rev["R_C"]: - assert get_timestamp(provenance.cursor, "content", rc["dst"]) == [ + assert get_timestamp(provenance.storage.cursor, "content", rc["dst"]) == [ rev_ts + rc["rel_ts"] ], synth_rev["msg"] # check directories # each directory stored in the provenance index is an entry # in the "directory" table... rows["directory"] |= set(x["dst"].hex() for x in synth_rev["R_D"]) - assert rows["directory"] == sha1s(provenance.cursor, "directory"), synth_rev[ - "msg" - ] + assert rows["directory"] == sha1s( + provenance.storage.cursor, "directory" + ), synth_rev["msg"] # ... + a number of rows in the "directory_in_rev" table... # check for R-D entries rows["directory_in_rev"] |= set( (x["dst"].hex(), x["src"].hex(), x["path"]) for x in synth_rev["R_D"] ) assert rows["directory_in_rev"] == relations( - provenance.cursor, "directory", "revision" + provenance.storage.cursor, "directory", "revision" ), synth_rev["msg"] # check timestamps for rd in synth_rev["R_D"]: - assert get_timestamp(provenance.cursor, "directory", rd["dst"]) == [ + assert get_timestamp(provenance.storage.cursor, "directory", rd["dst"]) == [ rev_ts + rd["rel_ts"] ], synth_rev["msg"] # ... + a number of rows in the "content_in_dir" table # for content of the directory. # check for D-C entries rows["content_in_dir"] |= set( (x["dst"].hex(), x["src"].hex(), x["path"]) for x in synth_rev["D_C"] ) assert rows["content_in_dir"] == relations( - provenance.cursor, "content", "directory" + provenance.storage.cursor, "content", "directory" ), synth_rev["msg"] # check timestamps for dc in synth_rev["D_C"]: - assert get_timestamp(provenance.cursor, "content", dc["dst"]) == [ + assert get_timestamp(provenance.storage.cursor, "content", dc["dst"]) == [ rev_ts + dc["rel_ts"] ], synth_rev["msg"] # check for location entries rows["location"] |= set(x["path"] for x in synth_rev["R_C"]) rows["location"] |= set(x["path"] for x in synth_rev["D_C"]) rows["location"] |= set(x["path"] for x in synth_rev["R_D"]) - assert rows["location"] == locations(provenance.cursor), synth_rev["msg"] + assert rows["location"] == locations(provenance.storage.cursor), synth_rev[ + "msg" + ] @pytest.mark.parametrize( "repo, lower, mindepth", ( ("cmdbts2", True, 1), ("cmdbts2", False, 1), ("cmdbts2", True, 2), ("cmdbts2", False, 2), ("out-of-order", True, 1), ), ) @pytest.mark.parametrize("batch", (True, False)) def test_provenance_heuristics_content_find_all( provenance, swh_storage, archive, repo, lower, mindepth, batch ): # read data/README.md for more details on how these datasets are generated data = load_repo_data(repo) fill_storage(swh_storage, data) revisions = [ RevisionEntry( id=revision["id"], date=ts2dt(revision["date"]), root=revision["directory"], ) for revision in data["revision"] ] if batch: revision_add(provenance, archive, revisions, lower=lower, mindepth=mindepth) else: for revision in revisions: revision_add( provenance, archive, [revision], lower=lower, mindepth=mindepth ) syntheticfile = get_datafile( f"synthetic_{repo}_{'lower' if lower else 'upper'}_{mindepth}.txt" ) expected_occurrences = {} for synth_rev in synthetic_result(syntheticfile): rev_id = synth_rev["sha1"].hex() rev_ts = synth_rev["date"] for rc in synth_rev["R_C"]: expected_occurrences.setdefault(rc["dst"].hex(), []).append( (rev_id, rev_ts, rc["path"]) ) for dc in synth_rev["D_C"]: assert dc["prefix"] is not None # to please mypy expected_occurrences.setdefault(dc["dst"].hex(), []).append( (rev_id, rev_ts, dc["prefix"] + "/" + dc["path"]) ) for content_id, results in expected_occurrences.items(): expected = [(content_id, *result) for result in results] db_occurrences = [ (blob.hex(), rev.hex(), date.timestamp(), path.decode()) for blob, rev, date, path in provenance.content_find_all( bytes.fromhex(content_id) ) ] assert len(db_occurrences) == len(expected) assert set(db_occurrences) == set(expected) @pytest.mark.parametrize( "repo, lower, mindepth", ( ("cmdbts2", True, 1), ("cmdbts2", False, 1), ("cmdbts2", True, 2), ("cmdbts2", False, 2), ("out-of-order", True, 1), ), ) def test_provenance_heuristics_content_find_first( provenance, swh_storage, archive, repo, lower, mindepth ): # read data/README.md for more details on how these datasets are generated data = load_repo_data(repo) fill_storage(swh_storage, data) revisions = [ RevisionEntry( id=revision["id"], date=ts2dt(revision["date"]), root=revision["directory"], ) for revision in data["revision"] ] # XXX adding all revisions at once should be working just fine, but it does not... # revision_add(provenance, archive, revisions, lower=lower, mindepth=mindepth) # ...so add revisions one at a time for now for revision in revisions: revision_add(provenance, archive, [revision], lower=lower, mindepth=mindepth) syntheticfile = get_datafile( f"synthetic_{repo}_{'lower' if lower else 'upper'}_{mindepth}.txt" ) expected_first: Dict[str, Tuple[str, str, List[str]]] = {} # dict of tuples (blob_id, rev_id, [path, ...]) the third element for path # is a list because a content can be added at several places in a single # revision, in which case the result of content_find_first() is one of # those path, but we have no guarantee which one it will return. for synth_rev in synthetic_result(syntheticfile): rev_id = synth_rev["sha1"].hex() rev_ts = synth_rev["date"] for rc in synth_rev["R_C"]: sha1 = rc["dst"].hex() if sha1 not in expected_first: assert rc["rel_ts"] == 0 expected_first[sha1] = (rev_id, rev_ts, [rc["path"]]) else: if rev_ts == expected_first[sha1][1]: expected_first[sha1][2].append(rc["path"]) elif rev_ts < expected_first[sha1][1]: expected_first[sha1] = (rev_id, rev_ts, rc["path"]) for dc in synth_rev["D_C"]: sha1 = rc["dst"].hex() assert sha1 in expected_first # nothing to do there, this content cannot be a "first seen file" for content_id, (rev_id, ts, paths) in expected_first.items(): (r_sha1, r_rev_id, r_ts, r_path) = provenance.content_find_first( bytes.fromhex(content_id) ) assert r_sha1.hex() == content_id assert r_rev_id.hex() == rev_id assert r_ts.timestamp() == ts assert r_path.decode() in paths