diff --git a/swh/provenance/model.py b/swh/provenance/model.py --- a/swh/provenance/model.py +++ b/swh/provenance/model.py @@ -4,7 +4,7 @@ # See top-level LICENSE file for more information from datetime import datetime -from typing import Iterable, List, Optional, Union +from typing import Iterable, Iterator, List, Optional, Union from .archive import ArchiveInterface @@ -49,6 +49,9 @@ ] yield from self._nodes + def __str__(self): + return f"" + class DirectoryEntry: def __init__(self, id: bytes, name: bytes = b""): @@ -56,7 +59,9 @@ self.name = name self._children: Optional[List[Union[DirectoryEntry, FileEntry]]] = None - def ls(self, archive: ArchiveInterface): + def ls( + self, archive: ArchiveInterface + ) -> Iterator[Union["DirectoryEntry", "FileEntry"]]: if self._children is None: self._children = [] for child in archive.directory_ls(self.id): @@ -68,8 +73,20 @@ self._children.append(FileEntry(child["target"], child["name"])) yield from self._children + def ls_files(self, archive: ArchiveInterface) -> List["FileEntry"]: + return [x for x in self.ls(archive) if isinstance(x, FileEntry)] + + def ls_dirs(self, archive: ArchiveInterface) -> List["DirectoryEntry"]: + return [x for x in self.ls(archive) if isinstance(x, DirectoryEntry)] + + def __str__(self): + return f"" + class FileEntry: def __init__(self, id: bytes, name: bytes): self.id = id self.name = name + + def __str__(self): + return f"" diff --git a/swh/provenance/postgresql/provenancedb_base.py b/swh/provenance/postgresql/provenancedb_base.py --- a/swh/provenance/postgresql/provenancedb_base.py +++ b/swh/provenance/postgresql/provenancedb_base.py @@ -90,9 +90,9 @@ f"""SELECT sha1, date FROM content WHERE sha1 IN ({values})""", tuple(pending), ) - for row in self.cursor.fetchall(): - dates[row[0]] = row[1] - self.select_cache["content"][row[0]] = row[1] + for sha1, date in self.cursor.fetchall(): + dates[sha1] = date + self.select_cache["content"][sha1] = date return dates def content_set_early_date(self, blob: FileEntry, date: datetime): @@ -140,9 +140,9 @@ f"""SELECT sha1, date FROM directory WHERE sha1 IN ({values})""", tuple(pending), ) - for row in self.cursor.fetchall(): - dates[row[0]] = row[1] - self.select_cache["directory"][row[0]] = row[1] + for sha1, date in self.cursor.fetchall(): + dates[sha1] = date + self.select_cache["directory"][sha1] = date return dates def directory_invalidate_in_isochrone_frontier(self, directory: DirectoryEntry): diff --git a/swh/provenance/provenance.py b/swh/provenance/provenance.py --- a/swh/provenance/provenance.py +++ b/swh/provenance/provenance.py @@ -251,31 +251,52 @@ def __init__( self, entry: DirectoryEntry, - date: Optional[datetime] = None, + dbdate: Optional[datetime] = None, depth: int = 0, prefix: bytes = b"", ): self.entry = entry self.depth = depth - self.date = date - self.known = self.date is not None - self.children: List[IsochroneNode] = [] - self.maxdate: Optional[datetime] = None + + # dbdate is the maxdate for this node that comes from the DB + self._dbdate: Optional[datetime] = dbdate + # maxdate is set by the maxdate computation algorithm (if not in the DB) + # if 'dbdate' is not None, them it should be used as maxdate + self.maxdate: Optional[datetime] = dbdate + self.known: bool = self.dbdate is not None self.path = os.path.join(prefix, self.entry.name) if prefix else self.entry.name + self.children: List[IsochroneNode] = [] + self.files: List[FileEntry] = [] + + def __str__(self): + return ( + f"<{self.entry.__class__.__name__}[{self.entry.name}]: " + f"known={self.known}, maxdate={self.maxdate}, dbdate={self.dbdate}>" + ) + + @property + def dbdate(self): + # use a property to make this attribute (mostly) read-only + return self._dbdate + + def clear_dbdate(self): + self._dbdate = None + self.maxdate = None - def add_child( + def add_directory( self, child: DirectoryEntry, date: Optional[datetime] = None ) -> "IsochroneNode": - assert isinstance(self.entry, DirectoryEntry) and self.date is None - node = IsochroneNode( - child, - date=date, - depth=self.depth + 1, - prefix=self.path, - ) + # we should not be processing this node (ie add subdirectories or + # files) if it's actually known byt the provenance DB + assert self.dbdate is None + node = IsochroneNode(child, dbdate=date, depth=self.depth + 1, prefix=self.path) self.children.append(node) return node + def add_file(self, file: FileEntry): + assert self.dbdate is None # same as above + self.files.append(file) + def build_isochrone_graph( archive: ArchiveInterface, @@ -286,95 +307,100 @@ assert revision.date is not None assert revision.root == directory.id + # this function process a revision in 2 steps: + # + # 1. build the tree structure of IsochroneNode objects (one INode per + # directory under the root directory of the revision but not following + # known subdirectories), and gather the dates from the DB for already + # known objects; for files, just keep all the dates in a global 'fdates' + # dict; note that in this step, we will only recurse the directories + # that are not already known. + # + # 2. compute the max date for each node of the tree that was not found in the DB. + # Build the nodes structure root_date = provenance.directory_get_date_in_isochrone_frontier(directory) - root = IsochroneNode(directory, date=root_date) + root = IsochroneNode(directory, dbdate=root_date) stack = [root] logging.debug( f"Recursively creating graph for revision {hash_to_hex(revision.id)}..." ) + fdates: Dict[bytes, datetime] = {} # map {file_id: date} while stack: current = stack.pop() - assert isinstance(current.entry, DirectoryEntry) - if current.date is None or current.date > revision.date: + if current.dbdate is None or current.dbdate > revision.date: # If current directory has an associated date in the isochrone frontier that # is greater or equal to the current revision's one, it should be ignored as # the revision is being processed out of order. - if current.date is not None and current.date > revision.date: + if current.dbdate is not None and current.dbdate > revision.date: logging.debug( f"Invalidating frontier on {hash_to_hex(current.entry.id)}" - f" (date {current.date})" + f" (date {current.dbdate})" f" when processing revision {hash_to_hex(revision.id)}" f" (date {revision.date})" ) provenance.directory_invalidate_in_isochrone_frontier(current.entry) - current.date = None - current.known = False - # Pre-query all known dates for content/directories in the current directory + current.clear_dbdate() + + # Pre-query all known dates for directories in the current directory # for the provenance object to have them cached and (potentially) improve # performance. ddates = provenance.directory_get_dates_in_isochrone_frontier( - [ - child - for child in current.entry.ls(archive) - if isinstance(child, DirectoryEntry) - ] + current.entry.ls_dirs(archive) ) - fdates = provenance.content_get_early_dates( - [ - child - for child in current.entry.ls(archive) - if isinstance(child, FileEntry) - ] - ) - for child in current.entry.ls(archive): + for dir in current.entry.ls_dirs(archive): # Recursively analyse directory nodes. - if isinstance(child, DirectoryEntry): - node = current.add_child(child, date=ddates.get(child.id)) - stack.append(node) - else: - # WARNING: there is a type checking issue here! - current.add_child(child, date=fdates.get(child.id)) + node = current.add_directory(dir, date=ddates.get(dir.id, None)) + stack.append(node) + + fdates.update( + provenance.content_get_early_dates(current.entry.ls_files(archive)) + ) + for file in current.entry.ls_files(archive): + current.add_file(file) + logging.debug( f"Isochrone graph for revision {hash_to_hex(revision.id)} successfully created!" ) # Precalculate max known date for each node in the graph (only directory nodes are # pushed to the stack). - stack = [root] logging.debug(f"Computing maxdates for revision {hash_to_hex(revision.id)}...") + stack = [root] + while stack: current = stack.pop() # Current directory node is known if it already has an assigned date (ie. it was # already seen as an isochrone frontier). - if not current.known: - if any(map(lambda child: child.maxdate is None, current.children)): + if current.dbdate is None: + if any(x.maxdate is None for x in current.children): + # at least one child of current has no maxdate yet # Current node needs to be analysed again after its children. stack.append(current) for child in current.children: - if isinstance(child.entry, FileEntry): - # A file node is known if it already has an assigned date (ie. - # is was processed before) - if child.known: - assert child.date is not None - # Just use its known date. - child.maxdate = child.date - else: - # Use current revision date. - child.maxdate = revision.date - else: + if child.maxdate is None: + # if child.maxdate is None, it must be processed stack.append(child) else: - maxdates = [ - child.maxdate - for child in current.children - if child.maxdate is not None # mostly to please mypy - ] - current.maxdate = max(maxdates) if maxdates else UTCMIN + # all the files and directories under current have a maxdate, + # we can infer the maxdate for current directory + assert current.maxdate is None # If all content is already known, update current directory info. - current.known = all(map(lambda child: child.known, current.children)) - else: - # Directory node in the frontier, just use its known date. - current.maxdate = current.date + current.maxdate = max( + [UTCMIN] + + [ + child.maxdate + for child in current.children + if child.maxdate is not None # unnecessary, but needed for mypy + ] + + [fdates.get(file.id, revision.date) for file in current.files] + ) + current.known = ( + # true if all subdirectories are known + all(child.known for child in current.children) + # true if all files were known + # *before building this isochrone graph node* + and all((file.id in fdates) for file in current.files) + ) logging.debug( f"Maxdates for revision {hash_to_hex(revision.id)} successfully computed!" ) @@ -396,9 +422,8 @@ stack = [graph] while stack: current = stack.pop() - assert isinstance(current.entry, DirectoryEntry) - if current.date is not None: - assert current.date <= revision.date + if current.dbdate is not None: + assert current.dbdate <= revision.date if trackall: # Current directory is an outer isochrone frontier for a previously # processed revision. It should be reused as is. @@ -409,7 +434,11 @@ # Current directory is not an outer isochrone frontier for any previous # revision. It might be eligible for this one. if is_new_frontier( - current, revision, trackall=trackall, lower=lower, mindepth=mindepth + current, + revision=revision, + trackall=trackall, + lower=lower, + mindepth=mindepth, ): assert current.maxdate is not None # Outer frontier should be moved to current position in the isochrone @@ -428,14 +457,13 @@ # are being seen for the first time here. Add all blobs to current # revision updating date if necessary, and recursively analyse # subdirectories as candidates to the outer frontier. + for blob in current.files: + date = provenance.content_get_early_date(blob) + if date is None or revision.date < date: + provenance.content_set_early_date(blob, revision.date) + provenance.content_add_to_revision(revision, blob, current.path) for child in current.children: - if isinstance(child.entry, FileEntry): - blob = child.entry - if child.date is None or revision.date < child.date: - provenance.content_set_early_date(blob, revision.date) - provenance.content_add_to_revision(revision, blob, current.path) - else: - stack.append(child) + stack.append(child) def is_new_frontier( @@ -445,19 +473,21 @@ lower: bool = True, mindepth: int = 1, ) -> bool: - assert node.maxdate is not None and revision.date is not None + assert node.maxdate is not None # for mypy + assert revision.date is not None # idem if trackall: - # The only real condition for a directory to be a frontier is that its content - # is already known and its maxdate is less (or equal) than current revision's - # date. Checking mindepth is meant to skip root directories (or any arbitrary - # depth) to improve the result. The option lower tries to maximize the reusage - # rate of previously defined frontiers by keeping them low in the directory - # tree. + # The only real condition for a directory to be a frontier is that its + # content is already known and its maxdate is less (or equal) than + # current revision's date. Checking mindepth is meant to skip root + # directories (or any arbitrary depth) to improve the result. The + # option lower tries to maximize the reusage rate of previously defined + # frontiers by keeping them low in the directory tree. return ( - node.known # all content in node was already seen before + node.known and node.maxdate <= revision.date # all content is earlier than revision - and node.depth >= mindepth # deeper than the min allowed depth - and (has_blobs(node) if lower else True) # there is at least one blob + and node.depth + >= mindepth # current node is deeper than the min allowed depth + and (has_blobs(node) if lower else True) # there is at least one blob in it ) else: # If we are only tracking first occurrences, we want to ensure that all first @@ -474,7 +504,7 @@ # We may want to look for files in different ways to decide whether to define a # frontier or not: # 1. Only files in current node: - return any(map(lambda child: isinstance(child.entry, FileEntry), node.children)) + return bool(node.files) # 2. Files anywhere in the isochrone graph # stack = [node] # while stack: