diff --git a/swh/provenance/__init__.py b/swh/provenance/__init__.py --- a/swh/provenance/__init__.py +++ b/swh/provenance/__init__.py @@ -24,17 +24,9 @@ def get_provenance(cls: str, **kwargs) -> "ProvenanceInterface": if cls == "local": conn = connect(kwargs["db"]) - if kwargs.get("with_path", True): - from swh.provenance.postgresql.provenancedb_with_path import ( - ProvenanceWithPathDB, - ) - - return ProvenanceWithPathDB(conn) - else: - from swh.provenance.postgresql.provenancedb_without_path import ( - ProvenanceWithoutPathDB, - ) - - return ProvenanceWithoutPathDB(conn) + with_path = kwargs.get("with_path", True) + from swh.provenance.provenance import ProvenanceBackend + + return ProvenanceBackend(conn, with_path=with_path) else: raise NotImplementedError diff --git a/swh/provenance/cli.py b/swh/provenance/cli.py --- a/swh/provenance/cli.py +++ b/swh/provenance/cli.py @@ -129,8 +129,7 @@ # TODO: add file size filtering """Process a provided list of revisions.""" from . import get_archive, get_provenance - from .provenance import revision_add - from .revision import CSVRevisionIterator + from .revision import CSVRevisionIterator, revision_add archive = get_archive(**ctx.obj["config"]["archive"]) provenance = get_provenance(**ctx.obj["config"]["provenance"]) @@ -157,8 +156,7 @@ def iter_origins(ctx, filename, limit): """Process a provided list of origins.""" from . import get_archive, get_provenance - from .origin import CSVOriginIterator - from .provenance import origin_add + from .origin import CSVOriginIterator, origin_add archive = get_archive(**ctx.obj["config"]["archive"]) provenance = get_provenance(**ctx.obj["config"]["provenance"]) @@ -200,7 +198,7 @@ @click.pass_context def find_all(ctx, swhid, limit): """Find all occurrences of the requested blob.""" - from swh.provenance import get_provenance + from . import get_provenance provenance = get_provenance(**ctx.obj["config"]["provenance"]) # TODO: return a dictionary with proper keys for each field diff --git a/swh/provenance/graph.py b/swh/provenance/graph.py new file mode 100644 --- /dev/null +++ b/swh/provenance/graph.py @@ -0,0 +1,223 @@ +from collections import Counter +from datetime import datetime, timezone +import logging +import os +from typing import Dict, List, Optional + +from swh.model.hashutil import hash_to_hex + +from .archive import ArchiveInterface +from .model import DirectoryEntry, RevisionEntry +from .provenance import ProvenanceInterface + +UTCMIN = datetime.min.replace(tzinfo=timezone.utc) + + +class IsochroneNode: + def __init__( + self, + entry: DirectoryEntry, + dbdate: Optional[datetime] = None, + depth: int = 0, + prefix: bytes = b"", + ): + self.entry = entry + self.depth = depth + + # dbdate is the maxdate for this node that comes from the DB + self._dbdate: Optional[datetime] = dbdate + + # maxdate is set by the maxdate computation algorithm + self.maxdate: Optional[datetime] = None + + # known is True if this node is already known in the db; either because + # the current directory actually exists in the database, or because all + # the content of the current directory is known (subdirectories and files) + self.known = self.dbdate is not None + self.invalid = False + self.path = os.path.join(prefix, self.entry.name) if prefix else self.entry.name + self.children: List[IsochroneNode] = [] + + @property + def dbdate(self): + # use a property to make this attribute (mostly) read-only + return self._dbdate + + def invalidate(self): + self._dbdate = None + self.maxdate = None + self.known = False + self.invalid = True + + def add_directory( + self, child: DirectoryEntry, date: Optional[datetime] = None + ) -> "IsochroneNode": + # we should not be processing this node (ie add subdirectories or + # files) if it's actually known by the provenance DB + assert self.dbdate is None + node = IsochroneNode(child, dbdate=date, depth=self.depth + 1, prefix=self.path) + self.children.append(node) + return node + + def __str__(self): + return ( + f"<{self.entry}: dbdate={self.dbdate}, maxdate={self.maxdate}, " + f"known={self.known}, invalid={self.invalid}, path={self.path}, " + f"children=[{', '.join(str(child) for child in self.children)}]>" + ) + + def __eq__(self, other): + return ( + isinstance(other, IsochroneNode) + and ( + self.entry, + self.depth, + self._dbdate, + self.maxdate, + self.known, + self.invalid, + self.path, + ) + == ( + other.entry, + other.depth, + other._dbdate, + other.maxdate, + other.known, + other.invalid, + other.path, + ) + and Counter(self.children) == Counter(other.children) + ) + + def __hash__(self): + return hash( + ( + self.entry, + self.depth, + self._dbdate, + self.maxdate, + self.known, + self.invalid, + self.path, + ) + ) + + +def build_isochrone_graph( + archive: ArchiveInterface, + provenance: ProvenanceInterface, + revision: RevisionEntry, + directory: DirectoryEntry, +) -> IsochroneNode: + assert revision.date is not None + assert revision.root == directory.id + + # this function process a revision in 2 steps: + # + # 1. build the tree structure of IsochroneNode objects (one INode per + # directory under the root directory of the revision but not following + # known subdirectories), and gather the dates from the DB for already + # known objects; for files, just keep all the dates in a global 'fdates' + # dict; note that in this step, we will only recurse the directories + # that are not already known. + # + # 2. compute the maxdate for each node of the tree that was not found in the DB. + + # Build the nodes structure + root_date = provenance.directory_get_date_in_isochrone_frontier(directory) + root = IsochroneNode(directory, dbdate=root_date) + stack = [root] + logging.debug( + f"Recursively creating graph for revision {hash_to_hex(revision.id)}..." + ) + fdates: Dict[bytes, datetime] = {} # map {file_id: date} + while stack: + current = stack.pop() + if current.dbdate is None or current.dbdate > revision.date: + # If current directory has an associated date in the isochrone frontier that + # is greater or equal to the current revision's one, it should be ignored as + # the revision is being processed out of order. + if current.dbdate is not None and current.dbdate > revision.date: + logging.debug( + f"Invalidating frontier on {hash_to_hex(current.entry.id)}" + f" (date {current.dbdate})" + f" when processing revision {hash_to_hex(revision.id)}" + f" (date {revision.date})" + ) + current.invalidate() + + # Pre-query all known dates for directories in the current directory + # for the provenance object to have them cached and (potentially) improve + # performance. + current.entry.retrieve_children(archive) + ddates = provenance.directory_get_dates_in_isochrone_frontier( + current.entry.dirs + ) + for dir in current.entry.dirs: + # Recursively analyse subdirectory nodes + node = current.add_directory(dir, date=ddates.get(dir.id, None)) + stack.append(node) + + fdates.update(provenance.content_get_early_dates(current.entry.files)) + + logging.debug( + f"Isochrone graph for revision {hash_to_hex(revision.id)} successfully created!" + ) + # Precalculate max known date for each node in the graph (only directory nodes are + # pushed to the stack). + logging.debug(f"Computing maxdates for revision {hash_to_hex(revision.id)}...") + stack = [root] + + while stack: + current = stack.pop() + # Current directory node is known if it already has an assigned date (ie. it was + # already seen as an isochrone frontier). + if current.known: + assert current.maxdate is None + current.maxdate = current.dbdate + else: + if any(x.maxdate is None for x in current.children): + # at least one child of current has no maxdate yet + # Current node needs to be analysed again after its children. + stack.append(current) + for child in current.children: + if child.maxdate is None: + # if child.maxdate is None, it must be processed + stack.append(child) + else: + # all the files and directories under current have a maxdate, + # we can infer the maxdate for current directory + assert current.maxdate is None + # if all content is already known, update current directory info. + current.maxdate = max( + [UTCMIN] + + [ + child.maxdate + for child in current.children + if child.maxdate is not None # unnecessary, but needed for mypy + ] + + [ + fdates.get(file.id, revision.date) + for file in current.entry.files + ] + ) + if current.maxdate <= revision.date: + current.known = ( + # true if all subdirectories are known + all(child.known for child in current.children) + # true if all files are in fdates, i.e. if all files were known + # *before building this isochrone graph node* + # Note: the 'all()' is lazy: will stop iterating as soon as + # possible + and all((file.id in fdates) for file in current.entry.files) + ) + else: + # at least one content is being processed out-of-order, then current + # node should be treated as unknown + current.maxdate = revision.date + current.known = False + logging.debug( + f"Maxdates for revision {hash_to_hex(revision.id)} successfully computed!" + ) + return root diff --git a/swh/provenance/origin.py b/swh/provenance/origin.py --- a/swh/provenance/origin.py +++ b/swh/provenance/origin.py @@ -1,13 +1,16 @@ from datetime import datetime, timezone from itertools import islice -from typing import Iterable, Iterator, Optional, Tuple +import logging +import time +from typing import Iterable, Iterator, List, Optional, Tuple import iso8601 -from .model import OriginEntry +from swh.model.hashutil import hash_to_hex -################################################################################ -################################################################################ +from .archive import ArchiveInterface +from .model import OriginEntry, RevisionEntry +from .provenance import ProvenanceInterface class CSVOriginIterator: @@ -39,3 +42,81 @@ for url, date, snap in self.statuses: date = iso8601.parse_date(date, default_timezone=timezone.utc) yield OriginEntry(url, date, snap) + + +def origin_add( + provenance: ProvenanceInterface, + archive: ArchiveInterface, + origins: List[OriginEntry], +) -> None: + start = time.time() + for origin in origins: + origin.retrieve_revisions(archive) + for revision in origin.revisions: + origin_add_revision(provenance, archive, origin, revision) + done = time.time() + provenance.commit() + stop = time.time() + logging.debug( + "Origins " + ";".join( + [origin.url + ":" + hash_to_hex(origin.snapshot) for origin in origins] + ) + + f" were processed in {stop - start} secs (commit took {stop - done} secs)!" + ) + + +def origin_add_revision( + provenance: ProvenanceInterface, + archive: ArchiveInterface, + origin: OriginEntry, + revision: RevisionEntry, +) -> None: + stack: List[Tuple[Optional[RevisionEntry], RevisionEntry]] = [(None, revision)] + origin.id = provenance.origin_get_id(origin) + + while stack: + relative, current = stack.pop() + + # Check if current revision has no preferred origin and update if necessary. + preferred = provenance.revision_get_preferred_origin(current) + + if preferred is None: + provenance.revision_set_preferred_origin(origin, current) + ######################################################################## + + if relative is None: + # This revision is pointed directly by the origin. + visited = provenance.revision_visited(current) + provenance.revision_add_to_origin(origin, current) + + if not visited: + stack.append((current, current)) + + else: + # This revision is a parent of another one in the history of the + # relative revision. + for parent in current.parents(archive): + visited = provenance.revision_visited(parent) + + if not visited: + # The parent revision has never been seen before pointing + # directly to an origin. + known = provenance.revision_in_history(parent) + + if known: + # The parent revision is already known in some other + # revision's history. We should point it directly to + # the origin and (eventually) walk its history. + stack.append((None, parent)) + else: + # The parent revision was never seen before. We should + # walk its history and associate it with the same + # relative revision. + provenance.revision_add_before_revision(relative, parent) + stack.append((relative, parent)) + else: + # The parent revision already points to an origin, so its + # history was properly processed before. We just need to + # make sure it points to the current origin as well. + provenance.revision_add_to_origin(origin, parent) diff --git a/swh/provenance/postgresql/provenancedb_base.py b/swh/provenance/postgresql/provenancedb_base.py --- a/swh/provenance/postgresql/provenancedb_base.py +++ b/swh/provenance/postgresql/provenancedb_base.py @@ -1,288 +1,141 @@ from datetime import datetime import itertools import logging -from typing import Any, Dict, Iterable, Optional +from typing import Any, Dict, Generator, List, Optional, Set, Tuple import psycopg2 import psycopg2.extras -from ..model import DirectoryEntry, FileEntry -from ..origin import OriginEntry -from ..revision import RevisionEntry - class ProvenanceDBBase: - raise_on_commit: bool = False - def __init__(self, conn: psycopg2.extensions.connection): - # TODO: consider adding a mutex for thread safety conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) conn.set_session(autocommit=True) self.conn = conn self.cursor = self.conn.cursor() # XXX: not sure this is the best place to do it! self.cursor.execute("SET timezone TO 'UTC'") - self.insert_cache: Dict[str, Any] = {} - self.select_cache: Dict[str, Any] = {} - self.clear_caches() - - def clear_caches(self): - self.insert_cache = { - "content": dict(), - "content_early_in_rev": set(), - "content_in_dir": set(), - "directory": dict(), - "directory_in_rev": set(), - "revision": dict(), - "revision_before_rev": list(), - "revision_in_org": list(), - } - self.select_cache = {"content": dict(), "directory": dict(), "revision": dict()} - def commit(self): + def commit(self, data: Dict[str, Any], raise_on_commit: bool = False) -> bool: try: - self.insert_all() - self.clear_caches() - return True + # First insert entities + self.insert_entity("content", data) + self.insert_entity("directory", data) + self.insert_entity("revision", data) + + # Relations should come after ids for entities were resolved + self.insert_relation( + "content", + "revision", + "content_early_in_rev", + data["content_early_in_rev"], + ) + self.insert_relation( + "content", "directory", "content_in_dir", data["content_in_dir"] + ) + self.insert_relation( + "directory", "revision", "directory_in_rev", data["directory_in_rev"] + ) + + # TODO: this should be updated when origin-revision layer gets properly + # updated. + # if data["revision_before_rev"]: + # psycopg2.extras.execute_values( + # self.cursor, + # """ + # LOCK TABLE ONLY revision_before_rev; + # INSERT INTO revision_before_rev VALUES %s + # ON CONFLICT DO NOTHING + # """, + # data["revision_before_rev"], + # ) + # data["revision_before_rev"].clear() + # + # if data["revision_in_org"]: + # psycopg2.extras.execute_values( + # self.cursor, + # """ + # LOCK TABLE ONLY revision_in_org; + # INSERT INTO revision_in_org VALUES %s + # ON CONFLICT DO NOTHING + # """, + # data["revision_in_org"], + # ) + # data["revision_in_org"].clear() + return True except: # noqa: E722 # Unexpected error occurred, rollback all changes and log message logging.exception("Unexpected error") - if self.raise_on_commit: + if raise_on_commit: raise - return False - def content_get_early_date(self, blob: FileEntry) -> Optional[datetime]: - # First check if the date is being modified by current transection. - date = self.insert_cache["content"].get(blob.id, None) - if date is None: - # If not, check whether it's been query before - date = self.select_cache["content"].get(blob.id, None) - if date is None: - # Otherwise, query the database and cache the value - self.cursor.execute( - """SELECT date FROM content WHERE sha1=%s""", (blob.id,) - ) - row = self.cursor.fetchone() - date = row[0] if row is not None else None - self.select_cache["content"][blob.id] = date - return date - - def content_get_early_dates( - self, blobs: Iterable[FileEntry] - ) -> Dict[bytes, datetime]: + def get_dates(self, entity: str, ids: List[bytes]) -> Dict[bytes, datetime]: dates = {} - pending = [] - for blob in blobs: - # First check if the date is being modified by current transection. - date = self.insert_cache["content"].get(blob.id, None) - if date is not None: - dates[blob.id] = date - else: - # If not, check whether it's been query before - date = self.select_cache["content"].get(blob.id, None) - if date is not None: - dates[blob.id] = date - else: - pending.append(blob.id) - if pending: - # Otherwise, query the database and cache the values - values = ", ".join(itertools.repeat("%s", len(pending))) + if ids: + values = ", ".join(itertools.repeat("%s", len(ids))) self.cursor.execute( - f"""SELECT sha1, date FROM content WHERE sha1 IN ({values})""", - tuple(pending), + f"""SELECT sha1, date FROM {entity} WHERE sha1 IN ({values})""", + tuple(ids), ) - for sha1, date in self.cursor.fetchall(): - dates[sha1] = date - self.select_cache["content"][sha1] = date + dates.update(self.cursor.fetchall()) return dates - def content_set_early_date(self, blob: FileEntry, date: datetime): - self.insert_cache["content"][blob.id] = date - - def directory_get_date_in_isochrone_frontier( - self, directory: DirectoryEntry - ) -> Optional[datetime]: - # First check if the date is being modified by current transection. - date = self.insert_cache["directory"].get(directory.id, None) - if date is None: - # If not, check whether it's been query before - date = self.select_cache["directory"].get(directory.id, None) - if date is None: - # Otherwise, query the database and cache the value - self.cursor.execute( - """SELECT date FROM directory WHERE sha1=%s""", (directory.id,) - ) - row = self.cursor.fetchone() - date = row[0] if row is not None else None - self.select_cache["directory"][directory.id] = date - return date - - def directory_get_dates_in_isochrone_frontier( - self, dirs: Iterable[DirectoryEntry] - ) -> Dict[bytes, datetime]: - dates = {} - pending = [] - for directory in dirs: - # First check if the date is being modified by current transection. - date = self.insert_cache["directory"].get(directory.id, None) - if date is not None: - dates[directory.id] = date - else: - # If not, check whether it's been query before - date = self.select_cache["directory"].get(directory.id, None) - if date is not None: - dates[directory.id] = date - else: - pending.append(directory.id) - if pending: - # Otherwise, query the database and cache the values - values = ", ".join(itertools.repeat("%s", len(pending))) - self.cursor.execute( - f"""SELECT sha1, date FROM directory WHERE sha1 IN ({values})""", - tuple(pending), - ) - for sha1, date in self.cursor.fetchall(): - dates[sha1] = date - self.select_cache["directory"][sha1] = date - return dates - - def directory_set_date_in_isochrone_frontier( - self, directory: DirectoryEntry, date: datetime - ): - self.insert_cache["directory"][directory.id] = date - - def insert_all(self): - # Performe insertions with cached information - if self.insert_cache["content"]: - psycopg2.extras.execute_values( - self.cursor, - """ - LOCK TABLE ONLY content; - INSERT INTO content(sha1, date) VALUES %s - ON CONFLICT (sha1) DO - UPDATE SET date=LEAST(EXCLUDED.date,content.date) - """, - self.insert_cache["content"].items(), - ) - self.insert_cache["content"].clear() - - if self.insert_cache["directory"]: - psycopg2.extras.execute_values( - self.cursor, - """ - LOCK TABLE ONLY directory; - INSERT INTO directory(sha1, date) VALUES %s - ON CONFLICT (sha1) DO - UPDATE SET date=LEAST(EXCLUDED.date,directory.date) - """, - self.insert_cache["directory"].items(), - ) - self.insert_cache["directory"].clear() - - if self.insert_cache["revision"]: + def insert_entity(self, entity: str, data: Dict[str, Any]): + if data[entity]: psycopg2.extras.execute_values( self.cursor, - """ - LOCK TABLE ONLY revision; - INSERT INTO revision(sha1, date) VALUES %s - ON CONFLICT (sha1) DO - UPDATE SET date=LEAST(EXCLUDED.date,revision.date) + f""" + LOCK TABLE ONLY {entity}; + INSERT INTO {entity}(sha1, date) VALUES %s + ON CONFLICT (sha1) DO + UPDATE SET date=LEAST(EXCLUDED.date,{entity}.date) """, - self.insert_cache["revision"].items(), + data[entity].items(), ) - self.insert_cache["revision"].clear() - - # Relations should come after ids for elements were resolved - if self.insert_cache["content_early_in_rev"]: - self.insert_location("content", "revision", "content_early_in_rev") - - if self.insert_cache["content_in_dir"]: - self.insert_location("content", "directory", "content_in_dir") - - if self.insert_cache["directory_in_rev"]: - self.insert_location("directory", "revision", "directory_in_rev") - - # if self.insert_cache["revision_before_rev"]: - # psycopg2.extras.execute_values( - # self.cursor, - # """ - # LOCK TABLE ONLY revision_before_rev; - # INSERT INTO revision_before_rev VALUES %s - # ON CONFLICT DO NOTHING - # """, - # self.insert_cache["revision_before_rev"], - # ) - # self.insert_cache["revision_before_rev"].clear() - - # if self.insert_cache["revision_in_org"]: - # psycopg2.extras.execute_values( - # self.cursor, - # """ - # LOCK TABLE ONLY revision_in_org; - # INSERT INTO revision_in_org VALUES %s - # ON CONFLICT DO NOTHING - # """, - # self.insert_cache["revision_in_org"], - # ) - # self.insert_cache["revision_in_org"].clear() + # XXX: not sure if Python takes a reference or a copy. + # This might be useless! + data[entity].clear() - def origin_get_id(self, origin: OriginEntry) -> int: - if origin.id is None: - # Insert origin in the DB and return the assigned id - self.cursor.execute( - """ - LOCK TABLE ONLY origin; - INSERT INTO origin(url) VALUES (%s) - ON CONFLICT DO NOTHING - RETURNING id - """, - (origin.url,), - ) - return self.cursor.fetchone()[0] - else: - return origin.id - - def revision_add(self, revision: RevisionEntry): - # Add current revision to the compact DB - self.insert_cache["revision"][revision.id] = revision.date - - def revision_add_before_revision( - self, relative: RevisionEntry, revision: RevisionEntry + def insert_relation( + self, src: str, dst: str, relation: str, data: Set[Tuple[bytes, bytes, bytes]] ): - self.insert_cache["revision_before_rev"].append((revision.id, relative.id)) + ... - def revision_add_to_origin(self, origin: OriginEntry, revision: RevisionEntry): - self.insert_cache["revision_in_org"].append((revision.id, origin.id)) + def content_find_first( + self, blob: bytes + ) -> Optional[Tuple[bytes, bytes, datetime, bytes]]: + ... - def revision_get_early_date(self, revision: RevisionEntry) -> Optional[datetime]: - date = self.insert_cache["revision"].get(revision.id, None) - if date is None: - # If not, check whether it's been query before - date = self.select_cache["revision"].get(revision.id, None) - if date is None: - # Otherwise, query the database and cache the value - self.cursor.execute( - """SELECT date FROM revision WHERE sha1=%s""", (revision.id,) - ) - row = self.cursor.fetchone() - date = row[0] if row is not None else None - self.select_cache["revision"][revision.id] = date - return date + def content_find_all( + self, blob: bytes, limit: Optional[int] = None + ) -> Generator[Tuple[bytes, bytes, datetime, bytes], None, None]: + ... - def revision_get_preferred_origin(self, revision: RevisionEntry) -> int: - # TODO: adapt this method to consider cached values + def origin_get_id(self, url: str) -> int: + # Insert origin in the DB and return the assigned id self.cursor.execute( - """SELECT COALESCE(org,0) FROM revision WHERE sha1=%s""", (revision.id,) + """ + LOCK TABLE ONLY origin; + INSERT INTO origin(url) VALUES (%s) + ON CONFLICT DO NOTHING + RETURNING id + """, + (url,), + ) + return self.cursor.fetchone()[0] + + def revision_get_preferred_origin(self, revision: bytes) -> int: + self.cursor.execute( + """SELECT COALESCE(org,0) FROM revision WHERE sha1=%s""", (revision,) ) row = self.cursor.fetchone() # None means revision is not in database; # 0 means revision has no preferred origin return row[0] if row is not None and row[0] != 0 else None - def revision_in_history(self, revision: RevisionEntry) -> bool: - # TODO: adapt this method to consider cached values + def revision_in_history(self, revision: bytes) -> bool: self.cursor.execute( """ SELECT 1 @@ -291,20 +144,16 @@ ON revision.id=revision_before_rev.prev WHERE revision.sha1=%s """, - (revision.id,), + (revision,), ) return self.cursor.fetchone() is not None - def revision_set_preferred_origin( - self, origin: OriginEntry, revision: RevisionEntry - ): - # TODO: adapt this method to consider cached values + def revision_set_preferred_origin(self, origin: int, revision: bytes): self.cursor.execute( - """UPDATE revision SET org=%s WHERE sha1=%s""", (origin.id, revision.id) + """UPDATE revision SET org=%s WHERE sha1=%s""", (origin, revision) ) - def revision_visited(self, revision: RevisionEntry) -> bool: - # TODO: adapt this method to consider cached values + def revision_visited(self, revision: bytes) -> bool: self.cursor.execute( """ SELECT 1 @@ -313,6 +162,6 @@ ON revision.id=revision_in_org.rev WHERE revision.sha1=%s """, - (revision.id,), + (revision,), ) return self.cursor.fetchone() is not None diff --git a/swh/provenance/postgresql/provenancedb_with_path.py b/swh/provenance/postgresql/provenancedb_with_path.py --- a/swh/provenance/postgresql/provenancedb_with_path.py +++ b/swh/provenance/postgresql/provenancedb_with_path.py @@ -1,36 +1,15 @@ from datetime import datetime -import os -from typing import Generator, Optional, Tuple +from typing import Generator, Optional, Set, Tuple import psycopg2 import psycopg2.extras -from ..model import DirectoryEntry, FileEntry -from ..revision import RevisionEntry from .provenancedb_base import ProvenanceDBBase -def normalize(path: bytes) -> bytes: - return path[2:] if path.startswith(bytes("." + os.path.sep, "utf-8")) else path - - class ProvenanceWithPathDB(ProvenanceDBBase): - def content_add_to_directory( - self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes - ): - self.insert_cache["content_in_dir"].add( - (blob.id, directory.id, normalize(os.path.join(prefix, blob.name))) - ) - - def content_add_to_revision( - self, revision: RevisionEntry, blob: FileEntry, prefix: bytes - ): - self.insert_cache["content_early_in_rev"].add( - (blob.id, revision.id, normalize(os.path.join(prefix, blob.name))) - ) - def content_find_first( - self, blobid: bytes + self, blob: bytes ) -> Optional[Tuple[bytes, bytes, datetime, bytes]]: self.cursor.execute( """ @@ -45,12 +24,12 @@ WHERE C.sha1=%s ORDER BY date, rev, path ASC LIMIT 1 """, - (blobid,), + (blob,), ) return self.cursor.fetchone() def content_find_all( - self, blobid: bytes, limit: Optional[int] = None + self, blob: bytes, limit: Optional[int] = None ) -> Generator[Tuple[bytes, bytes, datetime, bytes], None, None]: early_cut = f"LIMIT {limit}" if limit is not None else "" self.cursor.execute( @@ -82,76 +61,72 @@ WHERE C.sha1=%s) ORDER BY date, rev, path {early_cut} """, - (blobid, blobid), + (blob, blob), ) # TODO: use POSTGRESQL EXPLAIN looking for query optimizations. yield from self.cursor.fetchall() - def directory_add_to_revision( - self, revision: RevisionEntry, directory: DirectoryEntry, path: bytes + def insert_relation( + self, src: str, dst: str, relation: str, data: Set[Tuple[bytes, bytes, bytes]] ): - self.insert_cache["directory_in_rev"].add( - (directory.id, revision.id, normalize(path)) - ) - - def insert_location(self, src0_table, src1_table, dst_table): - """Insert location entries in `dst_table` from the insert_cache + """Insert entries in `relation` from `data` Also insert missing location entries in the 'location' table. """ - # TODO: find a better way of doing this; might be doable in a coupls of - # SQL queries (one to insert missing entries in the location' table, - # one to insert entries in the dst_table) - - # Resolve src0 ids - src0_sha1s = tuple(set(sha1 for (sha1, _, _) in self.insert_cache[dst_table])) - fmt = ",".join(["%s"] * len(src0_sha1s)) - self.cursor.execute( - f"""SELECT sha1, id FROM {src0_table} WHERE sha1 IN ({fmt})""", - src0_sha1s, - ) - src0_values = dict(self.cursor.fetchall()) - - # Resolve src1 ids - src1_sha1s = tuple(set(sha1 for (_, sha1, _) in self.insert_cache[dst_table])) - fmt = ",".join(["%s"] * len(src1_sha1s)) - self.cursor.execute( - f"""SELECT sha1, id FROM {src1_table} WHERE sha1 IN ({fmt})""", - src1_sha1s, - ) - src1_values = dict(self.cursor.fetchall()) - - # insert missing locations - locations = tuple(set((loc,) for (_, _, loc) in self.insert_cache[dst_table])) - psycopg2.extras.execute_values( - self.cursor, - """ - LOCK TABLE ONLY location; - INSERT INTO location(path) VALUES %s - ON CONFLICT (path) DO NOTHING - """, - locations, - ) - # fetch location ids - fmt = ",".join(["%s"] * len(locations)) - self.cursor.execute( - f"SELECT path, id FROM location WHERE path IN ({fmt})", - locations, - ) - loc_ids = dict(self.cursor.fetchall()) - - # Insert values in dst_table - rows = [ - (src0_values[sha1_src], src1_values[sha1_dst], loc_ids[loc]) - for (sha1_src, sha1_dst, loc) in self.insert_cache[dst_table] - ] - psycopg2.extras.execute_values( - self.cursor, - f""" - LOCK TABLE ONLY {dst_table}; - INSERT INTO {dst_table} VALUES %s - ON CONFLICT DO NOTHING - """, - rows, - ) - self.insert_cache[dst_table].clear() + if data: + # TODO: find a better way of doing this; might be doable in a couple of + # SQL queries (one to insert missing entries in the location' table, + # one to insert entries in the relation) + + # Resolve src ids + src_sha1s = tuple(set(sha1 for (sha1, _, _) in data)) + fmt = ",".join(["%s"] * len(src_sha1s)) + self.cursor.execute( + f"""SELECT sha1, id FROM {src} WHERE sha1 IN ({fmt})""", + src_sha1s, + ) + src_values = dict(self.cursor.fetchall()) + + # Resolve dst ids + dst_sha1s = tuple(set(sha1 for (_, sha1, _) in data)) + fmt = ",".join(["%s"] * len(dst_sha1s)) + self.cursor.execute( + f"""SELECT sha1, id FROM {dst} WHERE sha1 IN ({fmt})""", + dst_sha1s, + ) + dst_values = dict(self.cursor.fetchall()) + + # insert missing locations + locations = tuple(set((loc,) for (_, _, loc) in data)) + psycopg2.extras.execute_values( + self.cursor, + """ + LOCK TABLE ONLY location; + INSERT INTO location(path) VALUES %s + ON CONFLICT (path) DO NOTHING + """, + locations, + ) + # fetch location ids + fmt = ",".join(["%s"] * len(locations)) + self.cursor.execute( + f"SELECT path, id FROM location WHERE path IN ({fmt})", + locations, + ) + loc_ids = dict(self.cursor.fetchall()) + + # Insert values in relation + rows = [ + (src_values[sha1_src], dst_values[sha1_dst], loc_ids[loc]) + for (sha1_src, sha1_dst, loc) in data + ] + psycopg2.extras.execute_values( + self.cursor, + f""" + LOCK TABLE ONLY {relation}; + INSERT INTO {relation} VALUES %s + ON CONFLICT DO NOTHING + """, + rows, + ) + data.clear() diff --git a/swh/provenance/postgresql/provenancedb_without_path.py b/swh/provenance/postgresql/provenancedb_without_path.py --- a/swh/provenance/postgresql/provenancedb_without_path.py +++ b/swh/provenance/postgresql/provenancedb_without_path.py @@ -1,13 +1,11 @@ from datetime import datetime import itertools import operator -from typing import Generator, Optional, Tuple +from typing import Generator, Optional, Set, Tuple import psycopg2 import psycopg2.extras -from ..model import DirectoryEntry, FileEntry -from ..revision import RevisionEntry from .provenancedb_base import ProvenanceDBBase ######################################################################################## @@ -16,18 +14,8 @@ class ProvenanceWithoutPathDB(ProvenanceDBBase): - def content_add_to_directory( - self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes - ): - self.insert_cache["content_in_dir"].add((blob.id, directory.id)) - - def content_add_to_revision( - self, revision: RevisionEntry, blob: FileEntry, prefix: bytes - ): - self.insert_cache["content_early_in_rev"].add((blob.id, revision.id)) - def content_find_first( - self, blobid: bytes + self, blob: bytes ) -> Optional[Tuple[bytes, bytes, datetime, bytes]]: self.cursor.execute( """ @@ -43,17 +31,17 @@ ON revision.id=content_in_rev.rev ORDER BY date, rev ASC LIMIT 1 """, - (blobid,), + (blob,), ) row = self.cursor.fetchone() if row is not None: - # TODO: query revision from the archive and look for blobid into a + # TODO: query revision from the archive and look for blob into a # recursive directory_ls of the revision's root. - return blobid, row[0], row[1], b"" + return blob, row[0], row[1], b"" return None def content_find_all( - self, blobid: bytes, limit: Optional[int] = None + self, blob: bytes, limit: Optional[int] = None ) -> Generator[Tuple[bytes, bytes, datetime, bytes], None, None]: early_cut = f"LIMIT {limit}" if limit is not None else "" self.cursor.execute( @@ -87,54 +75,48 @@ ) ORDER BY date, rev {early_cut} """, - (blobid, blobid), + (blob, blob), ) # TODO: use POSTGRESQL EXPLAIN looking for query optimizations. for row in self.cursor.fetchall(): - # TODO: query revision from the archive and look for blobid into a + # TODO: query revision from the archive and look for blob into a # recursive directory_ls of the revision's root. - yield blobid, row[0], row[1], b"" + yield blob, row[0], row[1], b"" - def directory_add_to_revision( - self, revision: RevisionEntry, directory: DirectoryEntry, path: bytes + def insert_relation( + self, src: str, dst: str, relation: str, data: Set[Tuple[bytes, bytes, bytes]] ): - self.insert_cache["directory_in_rev"].add((directory.id, revision.id)) - - def insert_location(self, src0_table, src1_table, dst_table): - # Resolve src0 ids - src0_values = dict().fromkeys( - map(operator.itemgetter(0), self.insert_cache[dst_table]) - ) - values = ", ".join(itertools.repeat("%s", len(src0_values))) - self.cursor.execute( - f"""SELECT sha1, id FROM {src0_table} WHERE sha1 IN ({values})""", - tuple(src0_values), - ) - src0_values = dict(self.cursor.fetchall()) + if data: + # Resolve src ids + src_values = dict().fromkeys(map(operator.itemgetter(0), data)) + values = ", ".join(itertools.repeat("%s", len(src_values))) + self.cursor.execute( + f"""SELECT sha1, id FROM {src} WHERE sha1 IN ({values})""", + tuple(src_values), + ) + src_values = dict(self.cursor.fetchall()) - # Resolve src1 ids - src1_values = dict().fromkeys( - map(operator.itemgetter(1), self.insert_cache[dst_table]) - ) - values = ", ".join(itertools.repeat("%s", len(src1_values))) - self.cursor.execute( - f"""SELECT sha1, id FROM {src1_table} WHERE sha1 IN ({values})""", - tuple(src1_values), - ) - src1_values = dict(self.cursor.fetchall()) + # Resolve dst ids + dst_values = dict().fromkeys(map(operator.itemgetter(1), data)) + values = ", ".join(itertools.repeat("%s", len(dst_values))) + self.cursor.execute( + f"""SELECT sha1, id FROM {dst} WHERE sha1 IN ({values})""", + tuple(dst_values), + ) + dst_values = dict(self.cursor.fetchall()) - # Insert values in dst_table - rows = map( - lambda row: (src0_values[row[0]], src1_values[row[1]]), - self.insert_cache[dst_table], - ) - psycopg2.extras.execute_values( - self.cursor, - f""" - LOCK TABLE ONLY {dst_table}; - INSERT INTO {dst_table} VALUES %s - ON CONFLICT DO NOTHING - """, - rows, - ) - self.insert_cache[dst_table].clear() + # Insert values in relation + rows = map( + lambda row: (src_values[row[0]], dst_values[row[1]]), + data, + ) + psycopg2.extras.execute_values( + self.cursor, + f""" + LOCK TABLE ONLY {relation}; + INSERT INTO {relation} VALUES %s + ON CONFLICT DO NOTHING + """, + rows, + ) + data.clear() diff --git a/swh/provenance/provenance.py b/swh/provenance/provenance.py --- a/swh/provenance/provenance.py +++ b/swh/provenance/provenance.py @@ -1,20 +1,16 @@ -from collections import Counter -from datetime import datetime, timezone +from datetime import datetime import logging import os -import time -from typing import Dict, Generator, Iterable, List, Optional, Tuple +from typing import Any, Dict, Generator, Iterable, List, Optional, Tuple +import psycopg2 from typing_extensions import Protocol, runtime_checkable -from swh.model.hashutil import hash_to_hex - -from .archive import ArchiveInterface from .model import DirectoryEntry, FileEntry, OriginEntry, RevisionEntry -UTCMIN = datetime.min.replace(tzinfo=timezone.utc) - +# XXX: this protocol doesn't make much sense now that flavours have been delegated to +# another class, lower in the callstack. @runtime_checkable class ProvenanceInterface(Protocol): raise_on_commit: bool = False @@ -34,12 +30,12 @@ ... def content_find_first( - self, blobid: bytes + self, blob: bytes ) -> Optional[Tuple[bytes, bytes, datetime, bytes]]: ... def content_find_all( - self, blobid: bytes, limit: Optional[int] = None + self, blob: bytes, limit: Optional[int] = None ) -> Generator[Tuple[bytes, bytes, datetime, bytes], None, None]: ... @@ -108,496 +104,167 @@ ... -def flatten_directory( - archive: ArchiveInterface, - provenance: ProvenanceInterface, - directory: DirectoryEntry, -) -> None: - """Recursively retrieve all the files of 'directory' and insert them in the - 'provenance' database in the 'content_to_directory' table. - """ - stack = [(directory, b"")] - while stack: - current, prefix = stack.pop() - current.retrieve_children(archive) - for f_child in current.files: - # Add content to the directory with the computed prefix. - provenance.content_add_to_directory(directory, f_child, prefix) - for d_child in current.dirs: - # Recursively walk the child directory. - stack.append((d_child, os.path.join(prefix, d_child.name))) - - -def origin_add( - provenance: ProvenanceInterface, - archive: ArchiveInterface, - origins: List[OriginEntry], -) -> None: - start = time.time() - for origin in origins: - origin.retrieve_revisions(archive) - for revision in origin.revisions: - origin_add_revision(provenance, archive, origin, revision) - done = time.time() - provenance.commit() - stop = time.time() - logging.debug( - "Origins " - ";".join( - [origin.url + ":" + hash_to_hex(origin.snapshot) for origin in origins] - ) - + f" were processed in {stop - start} secs (commit took {stop - done} secs)!" - ) - - -def origin_add_revision( - provenance: ProvenanceInterface, - archive: ArchiveInterface, - origin: OriginEntry, - revision: RevisionEntry, -) -> None: - stack: List[Tuple[Optional[RevisionEntry], RevisionEntry]] = [(None, revision)] - - while stack: - relative, current = stack.pop() - - # Check if current revision has no preferred origin and update if necessary. - preferred = provenance.revision_get_preferred_origin(current) - - if preferred is None: - provenance.revision_set_preferred_origin(origin, current) - ######################################################################## +# TODO: maybe move this to a separate file +class ProvenanceBackend: + raise_on_commit: bool = False - if relative is None: - # This revision is pointed directly by the origin. - visited = provenance.revision_visited(current) - provenance.revision_add_to_origin(origin, current) + def __init__(self, conn: psycopg2.extensions.connection, with_path: bool = True): + from .postgresql.provenancedb_base import ProvenanceDBBase - if not visited: - stack.append((current, current)) + # TODO: this class should not know what the actual used DB is. + self.storage: ProvenanceDBBase + if with_path: + from .postgresql.provenancedb_with_path import ProvenanceWithPathDB + self.storage = ProvenanceWithPathDB(conn) else: - # This revision is a parent of another one in the history of the - # relative revision. - for parent in current.parents(archive): - visited = provenance.revision_visited(parent) - - if not visited: - # The parent revision has never been seen before pointing - # directly to an origin. - known = provenance.revision_in_history(parent) - - if known: - # The parent revision is already known in some other - # revision's history. We should point it directly to - # the origin and (eventually) walk its history. - stack.append((None, parent)) - else: - # The parent revision was never seen before. We should - # walk its history and associate it with the same - # relative revision. - provenance.revision_add_before_revision(relative, parent) - stack.append((relative, parent)) - else: - # The parent revision already points to an origin, so its - # history was properly processed before. We just need to - # make sure it points to the current origin as well. - provenance.revision_add_to_origin(origin, parent) - - -def revision_add( - provenance: ProvenanceInterface, - archive: ArchiveInterface, - revisions: List[RevisionEntry], - trackall: bool = True, - lower: bool = True, - mindepth: int = 1, - commit: bool = True, -) -> None: - start = time.time() - for revision in revisions: - assert revision.date is not None - assert revision.root is not None - # Processed content starting from the revision's root directory. - date = provenance.revision_get_early_date(revision) - if date is None or revision.date < date: - logging.debug( - f"Processing revisions {hash_to_hex(revision.id)}" - f" (known date {date} / revision date {revision.date})..." - ) - graph = build_isochrone_graph( - archive, - provenance, - revision, - DirectoryEntry(revision.root), - ) - # TODO: add file size filtering - revision_process_content( - archive, - provenance, - revision, - graph, - trackall=trackall, - lower=lower, - mindepth=mindepth, - ) - done = time.time() - if commit: - # TODO: improve this! Maybe using a max attempt counter? - # Ideally Provenance class should guarantee that a commit never fails. - while not provenance.commit(): + from .postgresql.provenancedb_without_path import ProvenanceWithoutPathDB + + self.storage = ProvenanceWithoutPathDB(conn) + + self.write_cache: Dict[str, Any] = {} + self.read_cache: Dict[str, Any] = {} + self.clear_caches() + + def clear_caches(self): + self.write_cache = { + "content": dict(), + "content_early_in_rev": set(), + "content_in_dir": set(), + "directory": dict(), + "directory_in_rev": set(), + "revision": dict(), + "revision_before_rev": list(), + "revision_in_org": list(), + } + self.read_cache = {"content": dict(), "directory": dict(), "revision": dict()} + + def commit(self): + # TODO: for now we just forward the write_cache. This should be improved! + while not self.storage.commit( + self.write_cache, raise_on_commit=self.raise_on_commit + ): logging.warning( - "Could not commit revisions " - + ";".join([hash_to_hex(revision.id) for revision in revisions]) - + ". Retrying..." + f"Unable to commit cached information {self.write_cache}. Retrying..." ) - stop = time.time() - logging.debug( - f"Revisions {';'.join([hash_to_hex(revision.id) for revision in revisions])} " - f" were processed in {stop - start} secs (commit took {stop - done} secs)!" - ) - # logging.critical( - # ";".join([hash_to_hex(revision.id) for revision in revisions]) - # + f",{stop - start},{stop - done}" - # ) - - -class IsochroneNode: - def __init__( - self, - entry: DirectoryEntry, - dbdate: Optional[datetime] = None, - depth: int = 0, - prefix: bytes = b"", + self.clear_caches() + + def content_add_to_directory( + self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes ): - self.entry = entry - self.depth = depth - - # dbdate is the maxdate for this node that comes from the DB - self._dbdate: Optional[datetime] = dbdate - - # maxdate is set by the maxdate computation algorithm - self.maxdate: Optional[datetime] = None - - # known is True if this node is already known in the db; either because - # the current directory actually exists in the database, or because all - # the content of the current directory is known (subdirectories and files) - self.known = self.dbdate is not None - self.invalid = False - self.path = os.path.join(prefix, self.entry.name) if prefix else self.entry.name - self.children: List[IsochroneNode] = [] - - @property - def dbdate(self): - # use a property to make this attribute (mostly) read-only - return self._dbdate - - def invalidate(self): - self._dbdate = None - self.maxdate = None - self.known = False - self.invalid = True - - def add_directory( - self, child: DirectoryEntry, date: Optional[datetime] = None - ) -> "IsochroneNode": - # we should not be processing this node (ie add subdirectories or - # files) if it's actually known by the provenance DB - assert self.dbdate is None - node = IsochroneNode(child, dbdate=date, depth=self.depth + 1, prefix=self.path) - self.children.append(node) - return node - - def __str__(self): - return ( - f"<{self.entry}: dbdate={self.dbdate}, maxdate={self.maxdate}, " - f"known={self.known}, invalid={self.invalid}, path={self.path}, " - f"children=[{', '.join(str(child) for child in self.children)}]>" + self.write_cache["content_in_dir"].add( + (blob.id, directory.id, normalize(os.path.join(prefix, blob.name))) ) - def __eq__(self, other): - return ( - isinstance(other, IsochroneNode) - and ( - self.entry, - self.depth, - self._dbdate, - self.maxdate, - self.known, - self.invalid, - self.path, - ) - == ( - other.entry, - other.depth, - other._dbdate, - other.maxdate, - other.known, - other.invalid, - other.path, - ) - and Counter(self.children) == Counter(other.children) + def content_add_to_revision( + self, revision: RevisionEntry, blob: FileEntry, prefix: bytes + ): + self.write_cache["content_early_in_rev"].add( + (blob.id, revision.id, normalize(os.path.join(prefix, blob.name))) ) - def __hash__(self): - return hash( - ( - self.entry, - self.depth, - self._dbdate, - self.maxdate, - self.known, - self.invalid, - self.path, - ) + def content_find_first( + self, blob: bytes + ) -> Optional[Tuple[bytes, bytes, datetime, bytes]]: + return self.storage.content_find_first(blob) + + def content_find_all( + self, blob: bytes, limit: Optional[int] = None + ) -> Generator[Tuple[bytes, bytes, datetime, bytes], None, None]: + yield from self.storage.content_find_all(blob, limit=limit) + + def content_get_early_date(self, blob: FileEntry) -> Optional[datetime]: + return self.get_dates("content", [blob.id]).get(blob.id, None) + + def content_get_early_dates( + self, blobs: Iterable[FileEntry] + ) -> Dict[bytes, datetime]: + return self.get_dates("content", [blob.id for blob in blobs]) + + def content_set_early_date(self, blob: FileEntry, date: datetime): + self.write_cache["content"][blob.id] = date + # update read cache as well + self.read_cache["content"][blob.id] = date + + def directory_add_to_revision( + self, revision: RevisionEntry, directory: DirectoryEntry, path: bytes + ): + self.write_cache["directory_in_rev"].add( + (directory.id, revision.id, normalize(path)) ) + def directory_get_date_in_isochrone_frontier( + self, directory: DirectoryEntry + ) -> Optional[datetime]: + return self.get_dates("directory", [directory.id]).get(directory.id, None) -def build_isochrone_graph( - archive: ArchiveInterface, - provenance: ProvenanceInterface, - revision: RevisionEntry, - directory: DirectoryEntry, -) -> IsochroneNode: - assert revision.date is not None - assert revision.root == directory.id - - # this function process a revision in 2 steps: - # - # 1. build the tree structure of IsochroneNode objects (one INode per - # directory under the root directory of the revision but not following - # known subdirectories), and gather the dates from the DB for already - # known objects; for files, just keep all the dates in a global 'fdates' - # dict; note that in this step, we will only recurse the directories - # that are not already known. - # - # 2. compute the maxdate for each node of the tree that was not found in the DB. - - # Build the nodes structure - root_date = provenance.directory_get_date_in_isochrone_frontier(directory) - root = IsochroneNode(directory, dbdate=root_date) - stack = [root] - logging.debug( - f"Recursively creating graph for revision {hash_to_hex(revision.id)}..." - ) - fdates: Dict[bytes, datetime] = {} # map {file_id: date} - while stack: - current = stack.pop() - if current.dbdate is None or current.dbdate > revision.date: - # If current directory has an associated date in the isochrone frontier that - # is greater or equal to the current revision's one, it should be ignored as - # the revision is being processed out of order. - if current.dbdate is not None and current.dbdate > revision.date: - logging.debug( - f"Invalidating frontier on {hash_to_hex(current.entry.id)}" - f" (date {current.dbdate})" - f" when processing revision {hash_to_hex(revision.id)}" - f" (date {revision.date})" - ) - current.invalidate() - - # Pre-query all known dates for directories in the current directory - # for the provenance object to have them cached and (potentially) improve - # performance. - current.entry.retrieve_children(archive) - ddates = provenance.directory_get_dates_in_isochrone_frontier( - current.entry.dirs - ) - for dir in current.entry.dirs: - # Recursively analyse subdirectory nodes - node = current.add_directory(dir, date=ddates.get(dir.id, None)) - stack.append(node) - - fdates.update(provenance.content_get_early_dates(current.entry.files)) - - logging.debug( - f"Isochrone graph for revision {hash_to_hex(revision.id)} successfully created!" - ) - # Precalculate max known date for each node in the graph (only directory nodes are - # pushed to the stack). - logging.debug(f"Computing maxdates for revision {hash_to_hex(revision.id)}...") - stack = [root] - - while stack: - current = stack.pop() - # Current directory node is known if it already has an assigned date (ie. it was - # already seen as an isochrone frontier). - if current.known: - assert current.maxdate is None - current.maxdate = current.dbdate - else: - if any(x.maxdate is None for x in current.children): - # at least one child of current has no maxdate yet - # Current node needs to be analysed again after its children. - stack.append(current) - for child in current.children: - if child.maxdate is None: - # if child.maxdate is None, it must be processed - stack.append(child) + def directory_get_dates_in_isochrone_frontier( + self, dirs: Iterable[DirectoryEntry] + ) -> Dict[bytes, datetime]: + return self.get_dates("directory", [directory.id for directory in dirs]) + + def directory_set_date_in_isochrone_frontier( + self, directory: DirectoryEntry, date: datetime + ): + self.write_cache["directory"][directory.id] = date + # update read cache as well + self.read_cache["directory"][directory.id] = date + + def get_dates(self, entity: str, ids: List[bytes]) -> Dict[bytes, datetime]: + dates = {} + pending = [] + for sha1 in ids: + # Check whether the date has been queried before + date = self.read_cache[entity].get(sha1, None) + if date is not None: + dates[sha1] = date else: - # all the files and directories under current have a maxdate, - # we can infer the maxdate for current directory - assert current.maxdate is None - # if all content is already known, update current directory info. - current.maxdate = max( - [UTCMIN] - + [ - child.maxdate - for child in current.children - if child.maxdate is not None # unnecessary, but needed for mypy - ] - + [ - fdates.get(file.id, revision.date) - for file in current.entry.files - ] - ) - if current.maxdate <= revision.date: - current.known = ( - # true if all subdirectories are known - all(child.known for child in current.children) - # true if all files are in fdates, i.e. if all files were known - # *before building this isochrone graph node* - # Note: the 'all()' is lazy: will stop iterating as soon as - # possible - and all((file.id in fdates) for file in current.entry.files) - ) - else: - # at least one content is being processed out-of-order, then current - # node should be treated as unknown - current.maxdate = revision.date - current.known = False - logging.debug( - f"Maxdates for revision {hash_to_hex(revision.id)} successfully computed!" - ) - return root - - -def revision_process_content( - archive: ArchiveInterface, - provenance: ProvenanceInterface, - revision: RevisionEntry, - graph: IsochroneNode, - trackall: bool = True, - lower: bool = True, - mindepth: int = 1, -): - assert revision.date is not None - provenance.revision_add(revision) - - stack = [graph] - while stack: - current = stack.pop() - if current.dbdate is not None: - assert current.dbdate <= revision.date - if trackall: - # Current directory is an outer isochrone frontier for a previously - # processed revision. It should be reused as is. - provenance.directory_add_to_revision( - revision, current.entry, current.path - ) + pending.append(sha1) + dates.update(self.storage.get_dates(entity, pending)) + return dates + + def origin_get_id(self, origin: OriginEntry) -> int: + if origin.id is None: + return self.storage.origin_get_id(origin.url) else: - assert current.maxdate is not None - # Current directory is not an outer isochrone frontier for any previous - # revision. It might be eligible for this one. - if is_new_frontier( - current, - revision=revision, - trackall=trackall, - lower=lower, - mindepth=mindepth, - ): - # Outer frontier should be moved to current position in the isochrone - # graph. This is the first time this directory is found in the isochrone - # frontier. - provenance.directory_set_date_in_isochrone_frontier( - current.entry, current.maxdate - ) - if trackall: - provenance.directory_add_to_revision( - revision, current.entry, current.path - ) - flatten_directory(archive, provenance, current.entry) - else: - # If current node is an invalidated frontier, update its date for future - # revisions to get the proper value. - if current.invalid: - provenance.directory_set_date_in_isochrone_frontier( - current.entry, current.maxdate - ) - # No point moving the frontier here. Either there are no files or they - # are being seen for the first time here. Add all blobs to current - # revision updating date if necessary, and recursively analyse - # subdirectories as candidates to the outer frontier. - for blob in current.entry.files: - date = provenance.content_get_early_date(blob) - if date is None or revision.date < date: - provenance.content_set_early_date(blob, revision.date) - provenance.content_add_to_revision(revision, blob, current.path) - for child in current.children: - stack.append(child) - - -def is_new_frontier( - node: IsochroneNode, - revision: RevisionEntry, - trackall: bool = True, - lower: bool = True, - mindepth: int = 1, -) -> bool: - assert node.maxdate is not None # for mypy - assert revision.date is not None # idem - if trackall: - # The only real condition for a directory to be a frontier is that its - # content is already known and its maxdate is less (or equal) than - # current revision's date. Checking mindepth is meant to skip root - # directories (or any arbitrary depth) to improve the result. The - # option lower tries to maximize the reusage rate of previously defined - # frontiers by keeping them low in the directory tree. - return ( - node.known - and node.maxdate <= revision.date # all content is earlier than revision - and node.depth - >= mindepth # current node is deeper than the min allowed depth - and (has_blobs(node) if lower else True) # there is at least one blob in it - ) - else: - # If we are only tracking first occurrences, we want to ensure that all first - # occurrences end up in the content_early_in_rev relation. Thus, we force for - # every blob outside a frontier to have an extrictly earlier date. - return ( - node.maxdate < revision.date # all content is earlier than revision - and node.depth >= mindepth # deeper than the min allowed depth - and (has_blobs(node) if lower else True) # there is at least one blob - ) + return origin.id + + def revision_add(self, revision: RevisionEntry): + # Add current revision to the compact DB + self.write_cache["revision"][revision.id] = revision.date + # update read cache as well + self.read_cache["revision"][revision.id] = revision.date + + def revision_add_before_revision( + self, relative: RevisionEntry, revision: RevisionEntry + ): + self.write_cache["revision_before_rev"].append((revision.id, relative.id)) + + def revision_add_to_origin(self, origin: OriginEntry, revision: RevisionEntry): + self.write_cache["revision_in_org"].append((revision.id, origin.id)) + + def revision_get_early_date(self, revision: RevisionEntry) -> Optional[datetime]: + return self.get_dates("revision", [revision.id]).get(revision.id, None) + + def revision_get_preferred_origin(self, revision: RevisionEntry) -> int: + # TODO: adapt this method to consider cached values + return self.storage.revision_get_preferred_origin(revision.id) + + def revision_in_history(self, revision: RevisionEntry) -> bool: + # TODO: adapt this method to consider cached values + return self.storage.revision_in_history(revision.id) + + def revision_set_preferred_origin( + self, origin: OriginEntry, revision: RevisionEntry + ): + assert origin.id is not None + # TODO: adapt this method to consider cached values + self.storage.revision_set_preferred_origin(origin.id, revision.id) + + def revision_visited(self, revision: RevisionEntry) -> bool: + # TODO: adapt this method to consider cached values + return self.storage.revision_visited(revision.id) -def has_blobs(node: IsochroneNode) -> bool: - # We may want to look for files in different ways to decide whether to define a - # frontier or not: - # 1. Only files in current node: - return any(node.entry.files) - # 2. Files anywhere in the isochrone graph - # stack = [node] - # while stack: - # current = stack.pop() - # if any( - # map(lambda child: isinstance(child.entry, FileEntry), current.children)): - # return True - # else: - # # All children are directory entries. - # stack.extend(current.children) - # return False - # 3. Files in the intermediate directories between current node and any previously - # defined frontier: - # TODO: complete this case! - # return any( - # map(lambda child: isinstance(child.entry, FileEntry), node.children) - # ) or all( - # map( - # lambda child: ( - # not (isinstance(child.entry, DirectoryEntry) and child.date is None) - # ) - # or has_blobs(child), - # node.children, - # ) - # ) +def normalize(path: bytes) -> bytes: + return path[2:] if path.startswith(bytes("." + os.path.sep, "utf-8")) else path diff --git a/swh/provenance/revision.py b/swh/provenance/revision.py --- a/swh/provenance/revision.py +++ b/swh/provenance/revision.py @@ -1,14 +1,18 @@ from datetime import datetime, timezone from itertools import islice -from typing import Iterable, Iterator, Optional, Tuple +import logging +import os +import time +from typing import Iterable, Iterator, List, Optional, Tuple import iso8601 -from swh.model.hashutil import hash_to_bytes -from swh.provenance.model import RevisionEntry +from swh.model.hashutil import hash_to_bytes, hash_to_hex -######################################################################################## -######################################################################################## +from .archive import ArchiveInterface +from .graph import IsochroneNode, build_isochrone_graph +from .model import DirectoryEntry, RevisionEntry +from .provenance import ProvenanceInterface class CSVRevisionIterator: @@ -48,3 +52,204 @@ date=date, root=hash_to_bytes(root), ) + + +def revision_add( + provenance: ProvenanceInterface, + archive: ArchiveInterface, + revisions: List[RevisionEntry], + trackall: bool = True, + lower: bool = True, + mindepth: int = 1, + commit: bool = True, +) -> None: + start = time.time() + for revision in revisions: + assert revision.date is not None + assert revision.root is not None + # Processed content starting from the revision's root directory. + date = provenance.revision_get_early_date(revision) + if date is None or revision.date < date: + logging.debug( + f"Processing revisions {hash_to_hex(revision.id)}" + f" (known date {date} / revision date {revision.date})..." + ) + graph = build_isochrone_graph( + archive, + provenance, + revision, + DirectoryEntry(revision.root), + ) + # TODO: add file size filtering + revision_process_content( + archive, + provenance, + revision, + graph, + trackall=trackall, + lower=lower, + mindepth=mindepth, + ) + done = time.time() + if commit: + provenance.commit() + stop = time.time() + logging.debug( + f"Revisions {';'.join([hash_to_hex(revision.id) for revision in revisions])} " + f" were processed in {stop - start} secs (commit took {stop - done} secs)!" + ) + # logging.critical( + # ";".join([hash_to_hex(revision.id) for revision in revisions]) + # + f",{stop - start},{stop - done}" + # ) + + +def revision_process_content( + archive: ArchiveInterface, + provenance: ProvenanceInterface, + revision: RevisionEntry, + graph: IsochroneNode, + trackall: bool = True, + lower: bool = True, + mindepth: int = 1, +): + assert revision.date is not None + provenance.revision_add(revision) + + stack = [graph] + while stack: + current = stack.pop() + if current.dbdate is not None: + assert current.dbdate <= revision.date + if trackall: + # Current directory is an outer isochrone frontier for a previously + # processed revision. It should be reused as is. + provenance.directory_add_to_revision( + revision, current.entry, current.path + ) + else: + assert current.maxdate is not None + # Current directory is not an outer isochrone frontier for any previous + # revision. It might be eligible for this one. + if is_new_frontier( + current, + revision=revision, + trackall=trackall, + lower=lower, + mindepth=mindepth, + ): + # Outer frontier should be moved to current position in the isochrone + # graph. This is the first time this directory is found in the isochrone + # frontier. + provenance.directory_set_date_in_isochrone_frontier( + current.entry, current.maxdate + ) + if trackall: + provenance.directory_add_to_revision( + revision, current.entry, current.path + ) + flatten_directory(archive, provenance, current.entry) + else: + # If current node is an invalidated frontier, update its date for future + # revisions to get the proper value. + if current.invalid: + provenance.directory_set_date_in_isochrone_frontier( + current.entry, current.maxdate + ) + # No point moving the frontier here. Either there are no files or they + # are being seen for the first time here. Add all blobs to current + # revision updating date if necessary, and recursively analyse + # subdirectories as candidates to the outer frontier. + for blob in current.entry.files: + date = provenance.content_get_early_date(blob) + if date is None or revision.date < date: + provenance.content_set_early_date(blob, revision.date) + provenance.content_add_to_revision(revision, blob, current.path) + for child in current.children: + stack.append(child) + + +def flatten_directory( + archive: ArchiveInterface, + provenance: ProvenanceInterface, + directory: DirectoryEntry, +) -> None: + """Recursively retrieve all the files of 'directory' and insert them in the + 'provenance' database in the 'content_to_directory' table. + """ + stack = [(directory, b"")] + while stack: + current, prefix = stack.pop() + current.retrieve_children(archive) + for f_child in current.files: + # Add content to the directory with the computed prefix. + provenance.content_add_to_directory(directory, f_child, prefix) + for d_child in current.dirs: + # Recursively walk the child directory. + stack.append((d_child, os.path.join(prefix, d_child.name))) + + +def is_new_frontier( + node: IsochroneNode, + revision: RevisionEntry, + trackall: bool = True, + lower: bool = True, + mindepth: int = 1, +) -> bool: + assert node.maxdate is not None # for mypy + assert revision.date is not None # idem + if trackall: + # The only real condition for a directory to be a frontier is that its + # content is already known and its maxdate is less (or equal) than + # current revision's date. Checking mindepth is meant to skip root + # directories (or any arbitrary depth) to improve the result. The + # option lower tries to maximize the reusage rate of previously defined + # frontiers by keeping them low in the directory tree. + return ( + node.known + and node.maxdate <= revision.date # all content is earlier than revision + and node.depth + >= mindepth # current node is deeper than the min allowed depth + and (has_blobs(node) if lower else True) # there is at least one blob in it + ) + else: + # If we are only tracking first occurrences, we want to ensure that all first + # occurrences end up in the content_early_in_rev relation. Thus, we force for + # every blob outside a frontier to have an extrictly earlier date. + return ( + node.maxdate < revision.date # all content is earlier than revision + and node.depth >= mindepth # deeper than the min allowed depth + and (has_blobs(node) if lower else True) # there is at least one blob + ) + + +def has_blobs(node: IsochroneNode) -> bool: + # We may want to look for files in different ways to decide whether to define a + # frontier or not: + # 1. Only files in current node: + return any(node.entry.files) + # 2. Files anywhere in the isochrone graph + # stack = [node] + # while stack: + # current = stack.pop() + # if any( + # map(lambda child: isinstance(child.entry, FileEntry), current.children)): + # return True + # else: + # # All children are directory entries. + # stack.extend(current.children) + # return False + # 3. Files in the intermediate directories between current node and any previously + # defined frontier: + # TODO: complete this case! + # return any( + # map(lambda child: isinstance(child.entry, FileEntry), node.children) + # ) or all( + # map( + # lambda child: ( + # not (isinstance(child.entry, DirectoryEntry) and child.date is None) + # ) + # or has_blobs(child), + # node.children, + # ) + # ) diff --git a/swh/provenance/tests/conftest.py b/swh/provenance/tests/conftest.py --- a/swh/provenance/tests/conftest.py +++ b/swh/provenance/tests/conftest.py @@ -36,12 +36,10 @@ @pytest.fixture def provenance(provenance_db): """return a working and initialized provenance db""" - from swh.provenance.postgresql.provenancedb_with_path import ( - ProvenanceWithPathDB as ProvenanceDB, - ) + from swh.provenance.provenance import ProvenanceBackend BaseDb.adapt_conn(provenance_db) - prov = ProvenanceDB(provenance_db) + prov = ProvenanceBackend(provenance_db) # in test sessions, we DO want to raise any exception occurring at commit time prov.raise_on_commit = True return prov diff --git a/swh/provenance/tests/test_conftest.py b/swh/provenance/tests/test_conftest.py --- a/swh/provenance/tests/test_conftest.py +++ b/swh/provenance/tests/test_conftest.py @@ -7,7 +7,7 @@ def test_provenance_fixture(provenance): """Check the 'provenance' fixture produce a working ProvenanceDB object""" assert provenance - provenance.insert_all() # should be a noop + provenance.commit() # should be a noop def test_storage(swh_storage_with_objects): diff --git a/swh/provenance/tests/test_isochrone_graph.py b/swh/provenance/tests/test_isochrone_graph.py --- a/swh/provenance/tests/test_isochrone_graph.py +++ b/swh/provenance/tests/test_isochrone_graph.py @@ -10,8 +10,9 @@ import yaml from swh.model.hashutil import hash_to_bytes +from swh.provenance.graph import IsochroneNode, build_isochrone_graph from swh.provenance.model import DirectoryEntry, RevisionEntry -from swh.provenance.provenance import IsochroneNode, build_isochrone_graph, revision_add +from swh.provenance.revision import revision_add from swh.provenance.tests.conftest import fill_storage, get_datafile, load_repo_data from swh.provenance.tests.test_provenance_db import ts2dt diff --git a/swh/provenance/tests/test_provenance_db.py b/swh/provenance/tests/test_provenance_db.py --- a/swh/provenance/tests/test_provenance_db.py +++ b/swh/provenance/tests/test_provenance_db.py @@ -6,8 +6,8 @@ import datetime from swh.model.tests.swh_model_data import TEST_OBJECTS -from swh.provenance.origin import OriginEntry -from swh.provenance.provenance import origin_add +from swh.provenance.model import OriginEntry +from swh.provenance.origin import origin_add from swh.provenance.storage.archive import ArchiveStorage diff --git a/swh/provenance/tests/test_provenance_heuristics.py b/swh/provenance/tests/test_provenance_heuristics.py --- a/swh/provenance/tests/test_provenance_heuristics.py +++ b/swh/provenance/tests/test_provenance_heuristics.py @@ -8,7 +8,7 @@ import pytest from swh.provenance.model import RevisionEntry -from swh.provenance.provenance import revision_add +from swh.provenance.revision import revision_add from swh.provenance.tests.conftest import ( fill_storage, get_datafile, @@ -116,19 +116,21 @@ # each "entry" in the synth file is one new revision rows["revision"].add(synth_rev["sha1"].hex()) - assert rows["revision"] == sha1s(provenance.cursor, "revision"), synth_rev[ - "msg" - ] + assert rows["revision"] == sha1s( + provenance.storage.cursor, "revision" + ), synth_rev["msg"] # check the timestamp of the revision rev_ts = synth_rev["date"] assert get_timestamp( - provenance.cursor, "revision", synth_rev["sha1"].hex() + provenance.storage.cursor, "revision", synth_rev["sha1"].hex() ) == [rev_ts], synth_rev["msg"] # this revision might have added new content objects rows["content"] |= set(x["dst"].hex() for x in synth_rev["R_C"]) rows["content"] |= set(x["dst"].hex() for x in synth_rev["D_C"]) - assert rows["content"] == sha1s(provenance.cursor, "content"), synth_rev["msg"] + assert rows["content"] == sha1s( + provenance.storage.cursor, "content" + ), synth_rev["msg"] # check for R-C (direct) entries # these are added directly in the content_early_in_rev table @@ -136,11 +138,11 @@ (x["dst"].hex(), x["src"].hex(), x["path"]) for x in synth_rev["R_C"] ) assert rows["content_early_in_rev"] == relations( - provenance.cursor, "content", "revision" + provenance.storage.cursor, "content", "revision" ), synth_rev["msg"] # check timestamps for rc in synth_rev["R_C"]: - assert get_timestamp(provenance.cursor, "content", rc["dst"]) == [ + assert get_timestamp(provenance.storage.cursor, "content", rc["dst"]) == [ rev_ts + rc["rel_ts"] ], synth_rev["msg"] @@ -148,9 +150,9 @@ # each directory stored in the provenance index is an entry # in the "directory" table... rows["directory"] |= set(x["dst"].hex() for x in synth_rev["R_D"]) - assert rows["directory"] == sha1s(provenance.cursor, "directory"), synth_rev[ - "msg" - ] + assert rows["directory"] == sha1s( + provenance.storage.cursor, "directory" + ), synth_rev["msg"] # ... + a number of rows in the "directory_in_rev" table... # check for R-D entries @@ -158,11 +160,11 @@ (x["dst"].hex(), x["src"].hex(), x["path"]) for x in synth_rev["R_D"] ) assert rows["directory_in_rev"] == relations( - provenance.cursor, "directory", "revision" + provenance.storage.cursor, "directory", "revision" ), synth_rev["msg"] # check timestamps for rd in synth_rev["R_D"]: - assert get_timestamp(provenance.cursor, "directory", rd["dst"]) == [ + assert get_timestamp(provenance.storage.cursor, "directory", rd["dst"]) == [ rev_ts + rd["rel_ts"] ], synth_rev["msg"] @@ -173,11 +175,11 @@ (x["dst"].hex(), x["src"].hex(), x["path"]) for x in synth_rev["D_C"] ) assert rows["content_in_dir"] == relations( - provenance.cursor, "content", "directory" + provenance.storage.cursor, "content", "directory" ), synth_rev["msg"] # check timestamps for dc in synth_rev["D_C"]: - assert get_timestamp(provenance.cursor, "content", dc["dst"]) == [ + assert get_timestamp(provenance.storage.cursor, "content", dc["dst"]) == [ rev_ts + dc["rel_ts"] ], synth_rev["msg"] @@ -185,7 +187,9 @@ rows["location"] |= set(x["path"] for x in synth_rev["R_C"]) rows["location"] |= set(x["path"] for x in synth_rev["D_C"]) rows["location"] |= set(x["path"] for x in synth_rev["R_D"]) - assert rows["location"] == locations(provenance.cursor), synth_rev["msg"] + assert rows["location"] == locations(provenance.storage.cursor), synth_rev[ + "msg" + ] @pytest.mark.parametrize(