diff --git a/swh/provenance/model.py b/swh/provenance/model.py --- a/swh/provenance/model.py +++ b/swh/provenance/model.py @@ -4,7 +4,7 @@ # See top-level LICENSE file for more information from datetime import datetime -from typing import Iterable, List, Optional, Union +from typing import Iterable, Iterator, List, Optional, Union from .archive import ArchiveInterface @@ -49,6 +49,9 @@ ] yield from self._nodes + def __str__(self): + return f"" + class DirectoryEntry: def __init__(self, id: bytes, name: bytes): @@ -56,7 +59,9 @@ self.name = name self._children: Optional[List[Union[DirectoryEntry, FileEntry]]] = None - def ls(self, archive: ArchiveInterface): + def ls( + self, archive: ArchiveInterface + ) -> Iterator[Union["DirectoryEntry", "FileEntry"]]: if self._children is None: self._children = [] for child in archive.directory_ls(self.id): @@ -68,8 +73,20 @@ self._children.append(FileEntry(child["target"], child["name"])) yield from self._children + def ls_files(self, archive: ArchiveInterface) -> List["FileEntry"]: + return [x for x in self.ls(archive) if isinstance(x, FileEntry)] + + def ls_dirs(self, archive: ArchiveInterface) -> List["DirectoryEntry"]: + return [x for x in self.ls(archive) if isinstance(x, DirectoryEntry)] + + def __str__(self): + return f"" + class FileEntry: def __init__(self, id: bytes, name: bytes): self.id = id self.name = name + + def __str__(self): + return f"" diff --git a/swh/provenance/postgresql/provenancedb_base.py b/swh/provenance/postgresql/provenancedb_base.py --- a/swh/provenance/postgresql/provenancedb_base.py +++ b/swh/provenance/postgresql/provenancedb_base.py @@ -90,9 +90,9 @@ f"""SELECT sha1, date FROM content WHERE sha1 IN ({values})""", tuple(pending), ) - for row in self.cursor.fetchall(): - dates[row[0]] = row[1] - self.select_cache["content"][row[0]] = row[1] + for sha1, date in self.cursor.fetchall(): + dates[sha1] = date + self.select_cache["content"][sha1] = date return dates def content_set_early_date(self, blob: FileEntry, date: datetime): @@ -140,9 +140,9 @@ f"""SELECT sha1, date FROM directory WHERE sha1 IN ({values})""", tuple(pending), ) - for row in self.cursor.fetchall(): - dates[row[0]] = row[1] - self.select_cache["directory"][row[0]] = row[1] + for sha1, date in self.cursor.fetchall(): + dates[sha1] = date + self.select_cache["directory"][sha1] = date return dates def directory_invalidate_in_isochrone_frontier(self, directory: DirectoryEntry): diff --git a/swh/provenance/provenance.py b/swh/provenance/provenance.py --- a/swh/provenance/provenance.py +++ b/swh/provenance/provenance.py @@ -2,7 +2,9 @@ import logging import os from typing import Dict, Generator, List, Optional, Tuple + from typing_extensions import Protocol, runtime_checkable + from swh.model.hashutil import hash_to_hex from .archive import ArchiveInterface @@ -227,24 +229,49 @@ class IsochroneNode: def __init__( - self, entry: DirectoryEntry, date: Optional[datetime] = None, depth: int = 0 + self, entry: DirectoryEntry, dbdate: Optional[datetime] = None, depth: int = 0 ): self.entry = entry self.depth = depth - self.date = date - self.known: bool = self.date is not None + + # dbdate is the maxdate for this node that comes from the DB + self._dbdate: Optional[datetime] = dbdate + # maxdate is set by the maxdate computation algorithm (if not in the DB) + # if 'dbdate' is not None, them it should be used as maxdate + self.maxdate: Optional[datetime] = dbdate + self.known: bool = self.dbdate is not None self.children: List[IsochroneNode] = [] - self.maxdate: Optional[datetime] = None + self.files: List[FileEntry] = [] + + def __str__(self): + return ( + f"<{self.entry.__class__.__name__}[{self.entry.name}]: " + f"known={self.known}, maxdate={self.maxdate}, dbdate={self.dbdate}>" + ) + + @property + def dbdate(self): + # use a property to make this attribute (mostly) read-only + return self._dbdate - def add_child( - self, child: DirectoryEntry, dates: Dict[bytes, datetime] = {} + def clear_dbdate(self): + self._dbdate = None + self.maxdate = None + + def add_directory( + self, child: DirectoryEntry, date: Optional[datetime] = None ) -> "IsochroneNode": - assert isinstance(self.entry, DirectoryEntry) and self.date is None - node = IsochroneNode( - child, date=dates.get(child.id, None), depth=self.depth + 1) + # we should not be processing this node (ie add subdirectories or + # files) if it's actually known byt the provenance DB + assert self.dbdate is None + node = IsochroneNode(child, dbdate=date, depth=self.depth + 1) self.children.append(node) return node + def add_file(self, file: FileEntry): + assert self.dbdate is None # same as above + self.files.append(file) + def build_isochrone_graph( archive: ArchiveInterface, @@ -255,89 +282,94 @@ assert revision.date is not None assert revision.root == directory.id + # this function process a revision in 2 steps: + # + # 1. build the tree structure of IsochroneNode objects (one INode per + # directory under the root directory of the revision but not following + # known subdirectories), and gather the dates from the DB for already + # known objects; for files, just keep all the dates in a global 'fdates' + # dict; note that in this step, we will only recurse the directories + # that are not already known. + # + # 2. compute the max date for each node of the tree that was not found in the DB. + # Build the nodes structure root_date = provenance.directory_get_date_in_isochrone_frontier(directory) - root = IsochroneNode(directory, date=root_date) + root = IsochroneNode(directory, dbdate=root_date) stack = [root] logging.debug( f"Recursively creating graph for revision {hash_to_hex(revision.id)}..." ) + fdates: Dict[bytes, datetime] = {} # map {file_id: date} while stack: current = stack.pop() - assert isinstance(current.entry, DirectoryEntry) - if current.date is None or current.date > revision.date: + if current.dbdate is None or current.dbdate > revision.date: # If current directory has an associated date in the isochrone frontier that # is greater or equal to the current revision's one, it should be ignored as # the revision is being processed out of order. - if current.date is not None and current.date > revision.date: + if current.dbdate is not None and current.dbdate > revision.date: provenance.directory_invalidate_in_isochrone_frontier(current.entry) - current.date = None - current.known = False - # Pre-query all known dates for content/directories in the current directory + current.clear_dbdate() + + # Pre-query all known dates for directories in the current directory # for the provenance object to have them cached and (potentially) improve # performance. ddates = provenance.directory_get_dates_in_isochrone_frontier( - [ - child - for child in current.entry.ls(archive) - if isinstance(child, DirectoryEntry) - ] + current.entry.ls_dirs(archive) ) - fdates = provenance.content_get_early_dates( - [ - child - for child in current.entry.ls(archive) - if isinstance(child, FileEntry) - ] - ) - for child in current.entry.ls(archive): + for dir in current.entry.ls_dirs(archive): # Recursively analyse directory nodes. - if isinstance(child, DirectoryEntry): - node = current.add_child(child, dates=ddates) - stack.append(node) - else: - # WARNING: there is a type checking issue here! - current.add_child(child, dates=fdates) + node = current.add_directory(dir, date=ddates.get(dir.id, None)) + stack.append(node) + + fdates.update( + provenance.content_get_early_dates(current.entry.ls_files(archive)) + ) + for file in current.entry.ls_files(archive): + current.add_file(file) + logging.debug( f"Isochrone graph for revision {hash_to_hex(revision.id)} successfully created!" ) # Precalculate max known date for each node in the graph (only directory nodes are # pushed to the stack). - stack = [root] logging.debug(f"Computing maxdates for revision {hash_to_hex(revision.id)}...") + stack = [root] + while stack: current = stack.pop() # Current directory node is known if it already has an assigned date (ie. it was # already seen as an isochrone frontier). - if not current.known: - if any(map(lambda child: child.maxdate is None, current.children)): + if current.dbdate is None: + if any(x.maxdate is None for x in current.children): + # at least one child of current has no maxdate yet # Current node needs to be analysed again after its children. stack.append(current) for child in current.children: - if isinstance(child.entry, FileEntry): - # A file node is known if it already has an assigned date (ie. - # is was processed before) - if child.known: - assert child.date is not None - # Just use its known date. - child.maxdate = child.date - else: - # Use current revision date. - child.maxdate = revision.date - else: + if child.maxdate is None: + # if child.maxdate is None, it must be processed stack.append(child) else: - maxdates = [ - child.maxdate - for child in current.children - if child.maxdate is not None # mostly to please mypy - ] - current.maxdate = max(maxdates) if maxdates else UTCMIN + # all the files and directories under current have a maxdate, + # we can infer the maxdate for current directory + assert current.maxdate is None # If all content is already known, update current directory info. - current.known = all(map(lambda child: child.known, current.children)) - else: - # Directory node in the frontier, just use its known date. - current.maxdate = current.date + current.maxdate = max( + [UTCMIN] + + [ + child.maxdate + for child in current.children + if child.maxdate is not None # unnecessary, but needed for mypy + ] + + [fdates.get(file.id, revision.date) for file in current.files] + ) + current.known = ( + # true if all subdirectories are known + all(child.known for child in current.children) + # true if all files were known + # *before building this isochrone graph node* + and all((file.id in fdates) for file in current.files) + ) logging.debug( f"Maxdates for revision {hash_to_hex(revision.id)} successfully computed!" ) @@ -356,57 +388,66 @@ logging.debug( f"Building isochrone graph for revision {hash_to_hex(revision.id)}..." ) - stack = [(build_isochrone_graph(archive, provenance, revision, root), root.name)] + rootnode = build_isochrone_graph(archive, provenance, revision, root) + stack = [(rootnode, root.name)] logging.debug( f"Isochrone graph for revision {hash_to_hex(revision.id)} successfully built!" ) while stack: current, path = stack.pop() - assert isinstance(current.entry, DirectoryEntry) - if current.date is not None: - assert current.date <= revision.date - # Current directory is an outer isochrone frontier for a previously - # processed revision. It should be reused as is. + # Current directory is not an outer isochrone frontier for any previous + # revision. It might be eligible for this one. + if is_new_frontier( + current, + revision=revision, + lower=lower, + mindepth=mindepth, + ): + # Outer frontier should be moved to current position in the isochrone + # graph. This is the first time this directory is found in the isochrone + # frontier. + assert current.maxdate is not None # please mypy + provenance.directory_set_date_in_isochrone_frontier( + current.entry, + current.maxdate, + ) provenance.directory_add_to_revision(revision, current.entry, path) + flatten_directory(archive, provenance, current.entry) else: - # Current directory is not an outer isochrone frontier for any previous - # revision. It might be eligible for this one. - if is_new_frontier(current, revision, lower=lower, mindepth=mindepth): - assert current.maxdate is not None - # Outer frontier should be moved to current position in the isochrone - # graph. This is the first time this directory is found in the isochrone - # frontier. - provenance.directory_set_date_in_isochrone_frontier( - current.entry, current.maxdate - ) + if current.dbdate is not None: + assert current.dbdate <= revision.date + # Current directory is an outer isochrone frontier for a previously + # processed revision. It should be reused as is. provenance.directory_add_to_revision(revision, current.entry, path) - flatten_directory(archive, provenance, current.entry) else: # No point moving the frontier here. Either there are no files or they # are being seen for the first time here. Add all blobs to current # revision updating date if necessary, and recursively analyse # subdirectories as candidates to the outer frontier. + for blob in current.files: + date = provenance.content_get_early_date(blob) + if date is None or revision.date < date: + provenance.content_set_early_date(blob, revision.date) + provenance.content_add_to_revision(revision, blob, path) for child in current.children: - if isinstance(child.entry, FileEntry): - blob = child.entry - if child.date is None or revision.date < child.date: - provenance.content_set_early_date(blob, revision.date) - provenance.content_add_to_revision(revision, blob, path) - else: - stack.append((child, os.path.join(path, child.entry.name))) + stack.append((child, os.path.join(path, child.entry.name))) def is_new_frontier( - node: IsochroneNode, revision: RevisionEntry, lower: bool = True, mindepth: int = 1 + node: IsochroneNode, + revision: RevisionEntry, + lower: bool = True, + mindepth: int = 1, ) -> bool: - assert node.maxdate is not None and revision.date is not None # The only real condition for a directory to be a frontier is that its content is # already known and its maxdate is less (or equal) than current revision's date. # Checking mindepth is meant to skip root directories (or any arbitrary depth) to # improve the result. The option lower tries to maximize the reusage rate of # previously defined frontiers by keeping them low in the directory tree. + assert node.maxdate is not None # for mypy + assert revision.date is not None # idem return ( - node.known # all content in node was already seen before + node.known and node.maxdate <= revision.date # all content is earlier than revision and node.depth >= mindepth # current node is deeper than the min allowed depth and (has_blobs(node) if lower else True) # there is at least one blob in it @@ -417,7 +458,7 @@ # We may want to look for files in different ways to decide whether to define a # frontier or not: # 1. Only files in current node: - return any(map(lambda child: isinstance(child.entry, FileEntry), node.children)) + return bool(node.files) # 2. Files anywhere in the isochrone graph # stack = [node] # while stack: