diff --git a/swh/provenance/model.py b/swh/provenance/model.py index f3b0226..3ab0214 100644 --- a/swh/provenance/model.py +++ b/swh/provenance/model.py @@ -1,84 +1,103 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime -from typing import Iterable, List, Optional, Union +from typing import Iterable, Iterator, List, Optional from .archive import ArchiveInterface class OriginEntry: def __init__(self, url, revisions: Iterable["RevisionEntry"], id=None): self.id = id self.url = url self.revisions = revisions class RevisionEntry: def __init__( self, id: bytes, date: Optional[datetime] = None, root: Optional[bytes] = None, parents: Optional[Iterable[bytes]] = None, ): self.id = id self.date = date assert self.date is None or self.date.tzinfo is not None self.root = root self._parents = parents self._nodes: List[RevisionEntry] = [] def parents(self, archive: ArchiveInterface): if self._parents is None: revision = archive.revision_get([self.id]) if revision: self._parents = revision[0].parents if self._parents and not self._nodes: self._nodes = [ RevisionEntry( id=rev.id, root=rev.directory, date=rev.date, parents=rev.parents, ) for rev in archive.revision_get(self._parents) if rev ] yield from self._nodes def __str__(self): return f"" class DirectoryEntry: def __init__(self, id: bytes, name: bytes = b""): self.id = id self.name = name - self._children: Optional[List[Union[DirectoryEntry, FileEntry]]] = None + self._files: Optional[List[FileEntry]] = None + self._dirs: Optional[List[DirectoryEntry]] = None - def ls(self, archive: ArchiveInterface): - if self._children is None: - self._children = [] + def retrieve_children(self, archive: ArchiveInterface): + if self._files is None and self._dirs is None: + self._files = [] + self._dirs = [] for child in archive.directory_ls(self.id): if child["type"] == "dir": - self._children.append( + self._dirs.append( DirectoryEntry(child["target"], name=child["name"]) ) elif child["type"] == "file": - self._children.append(FileEntry(child["target"], child["name"])) - yield from self._children + self._files.append(FileEntry(child["target"], child["name"])) + + @property + def files(self) -> Iterator["FileEntry"]: + if self._files is None: + raise RuntimeError( + "Children of this node has not yet been retrieved. " + "Please call retrieve_children() before using this property." + ) + return (x for x in self._files) + + @property + def dirs(self) -> Iterator["DirectoryEntry"]: + if self._dirs is None: + raise RuntimeError( + "Children of this node has not yet been retrieved. " + "Please call retrieve_children() before using this property." + ) + return (x for x in self._dirs) def __str__(self): return f"" class FileEntry: def __init__(self, id: bytes, name: bytes): self.id = id self.name = name def __str__(self): return f"" diff --git a/swh/provenance/postgresql/provenancedb_base.py b/swh/provenance/postgresql/provenancedb_base.py index 1b80e5f..abf1ba4 100644 --- a/swh/provenance/postgresql/provenancedb_base.py +++ b/swh/provenance/postgresql/provenancedb_base.py @@ -1,323 +1,325 @@ from datetime import datetime import itertools import logging -from typing import Any, Dict, List, Optional, Set +from typing import Any, Dict, Iterable, Optional, Set import psycopg2 import psycopg2.extras from ..model import DirectoryEntry, FileEntry from ..origin import OriginEntry from ..revision import RevisionEntry class ProvenanceDBBase: raise_on_commit: bool = False def __init__(self, conn: psycopg2.extensions.connection): # TODO: consider adding a mutex for thread safety conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) conn.set_session(autocommit=True) self.conn = conn self.cursor = self.conn.cursor() # XXX: not sure this is the best place to do it! self.cursor.execute("SET timezone TO 'UTC'") self.insert_cache: Dict[str, Any] = {} self.remove_cache: Dict[str, Set[bytes]] = {} self.select_cache: Dict[str, Any] = {} self.clear_caches() def clear_caches(self): self.insert_cache = { "content": dict(), "content_early_in_rev": set(), "content_in_dir": set(), "directory": dict(), "directory_in_rev": set(), "revision": dict(), "revision_before_rev": list(), "revision_in_org": list(), } self.remove_cache = {"directory": set()} self.select_cache = {"content": dict(), "directory": dict(), "revision": dict()} def commit(self): try: self.insert_all() self.clear_caches() return True except: # noqa: E722 # Unexpected error occurred, rollback all changes and log message logging.exception("Unexpected error") if self.raise_on_commit: raise return False def content_get_early_date(self, blob: FileEntry) -> Optional[datetime]: # First check if the date is being modified by current transection. date = self.insert_cache["content"].get(blob.id, None) if date is None: # If not, check whether it's been query before date = self.select_cache["content"].get(blob.id, None) if date is None: # Otherwise, query the database and cache the value self.cursor.execute( """SELECT date FROM content WHERE sha1=%s""", (blob.id,) ) row = self.cursor.fetchone() date = row[0] if row is not None else None self.select_cache["content"][blob.id] = date return date - def content_get_early_dates(self, blobs: List[FileEntry]) -> Dict[bytes, datetime]: + def content_get_early_dates( + self, blobs: Iterable[FileEntry] + ) -> Dict[bytes, datetime]: dates = {} pending = [] for blob in blobs: # First check if the date is being modified by current transection. date = self.insert_cache["content"].get(blob.id, None) if date is not None: dates[blob.id] = date else: # If not, check whether it's been query before date = self.select_cache["content"].get(blob.id, None) if date is not None: dates[blob.id] = date else: pending.append(blob.id) if pending: # Otherwise, query the database and cache the values values = ", ".join(itertools.repeat("%s", len(pending))) self.cursor.execute( f"""SELECT sha1, date FROM content WHERE sha1 IN ({values})""", tuple(pending), ) for sha1, date in self.cursor.fetchall(): dates[sha1] = date self.select_cache["content"][sha1] = date return dates def content_set_early_date(self, blob: FileEntry, date: datetime): self.insert_cache["content"][blob.id] = date def directory_get_date_in_isochrone_frontier( self, directory: DirectoryEntry ) -> Optional[datetime]: # First check if the date is being modified by current transection. date = self.insert_cache["directory"].get(directory.id, None) if date is None and directory.id not in self.remove_cache["directory"]: # If not, check whether it's been query before date = self.select_cache["directory"].get(directory.id, None) if date is None: # Otherwise, query the database and cache the value self.cursor.execute( """SELECT date FROM directory WHERE sha1=%s""", (directory.id,) ) row = self.cursor.fetchone() date = row[0] if row is not None else None self.select_cache["directory"][directory.id] = date return date def directory_get_dates_in_isochrone_frontier( - self, dirs: List[DirectoryEntry] + self, dirs: Iterable[DirectoryEntry] ) -> Dict[bytes, datetime]: dates = {} pending = [] for directory in dirs: # First check if the date is being modified by current transection. date = self.insert_cache["directory"].get(directory.id, None) if date is not None: dates[directory.id] = date elif directory.id not in self.remove_cache["directory"]: # If not, check whether it's been query before date = self.select_cache["directory"].get(directory.id, None) if date is not None: dates[directory.id] = date else: pending.append(directory.id) if pending: # Otherwise, query the database and cache the values values = ", ".join(itertools.repeat("%s", len(pending))) self.cursor.execute( f"""SELECT sha1, date FROM directory WHERE sha1 IN ({values})""", tuple(pending), ) for sha1, date in self.cursor.fetchall(): dates[sha1] = date self.select_cache["directory"][sha1] = date return dates def directory_invalidate_in_isochrone_frontier(self, directory: DirectoryEntry): self.remove_cache["directory"].add(directory.id) self.insert_cache["directory"].pop(directory.id, None) def directory_set_date_in_isochrone_frontier( self, directory: DirectoryEntry, date: datetime ): self.insert_cache["directory"][directory.id] = date self.remove_cache["directory"].discard(directory.id) def insert_all(self): # Performe insertions with cached information if self.insert_cache["content"]: psycopg2.extras.execute_values( self.cursor, """ LOCK TABLE ONLY content; INSERT INTO content(sha1, date) VALUES %s ON CONFLICT (sha1) DO UPDATE SET date=LEAST(EXCLUDED.date,content.date) """, self.insert_cache["content"].items(), ) self.insert_cache["content"].clear() if self.insert_cache["directory"]: psycopg2.extras.execute_values( self.cursor, """ LOCK TABLE ONLY directory; INSERT INTO directory(sha1, date) VALUES %s ON CONFLICT (sha1) DO UPDATE SET date=LEAST(EXCLUDED.date,directory.date) """, self.insert_cache["directory"].items(), ) self.insert_cache["directory"].clear() if self.insert_cache["revision"]: psycopg2.extras.execute_values( self.cursor, """ LOCK TABLE ONLY revision; INSERT INTO revision(sha1, date) VALUES %s ON CONFLICT (sha1) DO UPDATE SET date=LEAST(EXCLUDED.date,revision.date) """, self.insert_cache["revision"].items(), ) self.insert_cache["revision"].clear() # Relations should come after ids for elements were resolved if self.insert_cache["content_early_in_rev"]: self.insert_location("content", "revision", "content_early_in_rev") if self.insert_cache["content_in_dir"]: self.insert_location("content", "directory", "content_in_dir") if self.insert_cache["directory_in_rev"]: self.insert_location("directory", "revision", "directory_in_rev") # if self.insert_cache["revision_before_rev"]: # psycopg2.extras.execute_values( # self.cursor, # """ # LOCK TABLE ONLY revision_before_rev; # INSERT INTO revision_before_rev VALUES %s # ON CONFLICT DO NOTHING # """, # self.insert_cache["revision_before_rev"], # ) # self.insert_cache["revision_before_rev"].clear() # if self.insert_cache["revision_in_org"]: # psycopg2.extras.execute_values( # self.cursor, # """ # LOCK TABLE ONLY revision_in_org; # INSERT INTO revision_in_org VALUES %s # ON CONFLICT DO NOTHING # """, # self.insert_cache["revision_in_org"], # ) # self.insert_cache["revision_in_org"].clear() def origin_get_id(self, origin: OriginEntry) -> int: if origin.id is None: # Insert origin in the DB and return the assigned id self.cursor.execute( """ LOCK TABLE ONLY origin; INSERT INTO origin(url) VALUES (%s) ON CONFLICT DO NOTHING RETURNING id """, (origin.url,), ) return self.cursor.fetchone()[0] else: return origin.id def revision_add(self, revision: RevisionEntry): # Add current revision to the compact DB self.insert_cache["revision"][revision.id] = revision.date def revision_add_before_revision( self, relative: RevisionEntry, revision: RevisionEntry ): self.insert_cache["revision_before_rev"].append((revision.id, relative.id)) def revision_add_to_origin(self, origin: OriginEntry, revision: RevisionEntry): self.insert_cache["revision_in_org"].append((revision.id, origin.id)) def revision_get_early_date(self, revision: RevisionEntry) -> Optional[datetime]: date = self.insert_cache["revision"].get(revision.id, None) if date is None: # If not, check whether it's been query before date = self.select_cache["revision"].get(revision.id, None) if date is None: # Otherwise, query the database and cache the value self.cursor.execute( """SELECT date FROM revision WHERE sha1=%s""", (revision.id,) ) row = self.cursor.fetchone() date = row[0] if row is not None else None self.select_cache["revision"][revision.id] = date return date def revision_get_preferred_origin(self, revision: RevisionEntry) -> int: # TODO: adapt this method to consider cached values self.cursor.execute( """SELECT COALESCE(org,0) FROM revision WHERE sha1=%s""", (revision.id,) ) row = self.cursor.fetchone() # None means revision is not in database; # 0 means revision has no preferred origin return row[0] if row is not None and row[0] != 0 else None def revision_in_history(self, revision: RevisionEntry) -> bool: # TODO: adapt this method to consider cached values self.cursor.execute( """ SELECT 1 FROM revision_before_rev JOIN revision ON revision.id=revision_before_rev.prev WHERE revision.sha1=%s """, (revision.id,), ) return self.cursor.fetchone() is not None def revision_set_preferred_origin( self, origin: OriginEntry, revision: RevisionEntry ): # TODO: adapt this method to consider cached values self.cursor.execute( """UPDATE revision SET org=%s WHERE sha1=%s""", (origin.id, revision.id) ) def revision_visited(self, revision: RevisionEntry) -> bool: # TODO: adapt this method to consider cached values self.cursor.execute( """ SELECT 1 FROM revision_in_org JOIN revision ON revision.id=revision_in_org.rev WHERE revision.sha1=%s """, (revision.id,), ) return self.cursor.fetchone() is not None diff --git a/swh/provenance/provenance.py b/swh/provenance/provenance.py index a2e0a90..6b36662 100644 --- a/swh/provenance/provenance.py +++ b/swh/provenance/provenance.py @@ -1,504 +1,496 @@ from datetime import datetime, timezone import logging import os import time -from typing import Dict, Generator, List, Optional, Tuple +from typing import Dict, Generator, Iterable, List, Optional, Tuple, Union from typing_extensions import Protocol, runtime_checkable from swh.model.hashutil import hash_to_hex from .archive import ArchiveInterface from .model import DirectoryEntry, FileEntry, OriginEntry, RevisionEntry UTCMIN = datetime.min.replace(tzinfo=timezone.utc) @runtime_checkable class ProvenanceInterface(Protocol): raise_on_commit: bool = False def commit(self): """Commit currently ongoing transactions in the backend DB""" ... def content_add_to_directory( self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes ) -> None: ... def content_add_to_revision( self, revision: RevisionEntry, blob: FileEntry, prefix: bytes ) -> None: ... def content_find_first( self, blobid: bytes ) -> Optional[Tuple[bytes, bytes, datetime, bytes]]: ... def content_find_all( self, blobid: bytes, limit: Optional[int] = None ) -> Generator[Tuple[bytes, bytes, datetime, bytes], None, None]: ... def content_get_early_date(self, blob: FileEntry) -> Optional[datetime]: ... - def content_get_early_dates(self, blobs: List[FileEntry]) -> Dict[bytes, datetime]: + def content_get_early_dates( + self, blobs: Iterable[FileEntry] + ) -> Dict[bytes, datetime]: ... def content_set_early_date(self, blob: FileEntry, date: datetime) -> None: ... def directory_add_to_revision( self, revision: RevisionEntry, directory: DirectoryEntry, path: bytes ) -> None: ... def directory_get_date_in_isochrone_frontier( self, directory: DirectoryEntry ) -> Optional[datetime]: ... def directory_get_dates_in_isochrone_frontier( - self, dirs: List[DirectoryEntry] + self, dirs: Iterable[DirectoryEntry] ) -> Dict[bytes, datetime]: ... def directory_invalidate_in_isochrone_frontier( self, directory: DirectoryEntry ) -> None: ... def directory_set_date_in_isochrone_frontier( self, directory: DirectoryEntry, date: datetime ) -> None: ... def origin_get_id(self, origin: OriginEntry) -> int: ... def revision_add(self, revision: RevisionEntry) -> None: ... def revision_add_before_revision( self, relative: RevisionEntry, revision: RevisionEntry ) -> None: ... def revision_add_to_origin( self, origin: OriginEntry, revision: RevisionEntry ) -> None: ... def revision_get_early_date(self, revision: RevisionEntry) -> Optional[datetime]: ... def revision_get_preferred_origin(self, revision: RevisionEntry) -> int: ... def revision_in_history(self, revision: RevisionEntry) -> bool: ... def revision_set_preferred_origin( self, origin: OriginEntry, revision: RevisionEntry ) -> None: ... def revision_visited(self, revision: RevisionEntry) -> bool: ... def flatten_directory( archive: ArchiveInterface, provenance: ProvenanceInterface, directory: DirectoryEntry, ) -> None: stack = [(directory, b"")] while stack: current, prefix = stack.pop() - for child in current.ls(archive): - if isinstance(child, FileEntry): - # Add content to the directory with the computed prefix. - provenance.content_add_to_directory(directory, child, prefix) - elif isinstance(child, DirectoryEntry): - # Recursively walk the child directory. - stack.append((child, os.path.join(prefix, child.name))) + current.retrieve_children(archive) + for f_child in current.files: + # Add content to the directory with the computed prefix. + provenance.content_add_to_directory(directory, f_child, prefix) + for d_child in current.dirs: + # Recursively walk the child directory. + stack.append((d_child, os.path.join(prefix, d_child.name))) def origin_add( archive: ArchiveInterface, provenance: ProvenanceInterface, origin: OriginEntry ) -> None: # TODO: refactor to iterate over origin visit statuses and commit only once # per status. origin.id = provenance.origin_get_id(origin) for revision in origin.revisions: origin_add_revision(archive, provenance, origin, revision) # Commit after each revision provenance.commit() # TODO: verify this! def origin_add_revision( archive: ArchiveInterface, provenance: ProvenanceInterface, origin: OriginEntry, revision: RevisionEntry, ) -> None: stack: List[Tuple[Optional[RevisionEntry], RevisionEntry]] = [(None, revision)] while stack: relative, current = stack.pop() # Check if current revision has no preferred origin and update if necessary. preferred = provenance.revision_get_preferred_origin(current) if preferred is None: provenance.revision_set_preferred_origin(origin, current) ######################################################################## if relative is None: # This revision is pointed directly by the origin. visited = provenance.revision_visited(current) provenance.revision_add_to_origin(origin, current) if not visited: stack.append((current, current)) else: # This revision is a parent of another one in the history of the # relative revision. for parent in current.parents(archive): visited = provenance.revision_visited(parent) if not visited: # The parent revision has never been seen before pointing # directly to an origin. known = provenance.revision_in_history(parent) if known: # The parent revision is already known in some other # revision's history. We should point it directly to # the origin and (eventually) walk its history. stack.append((None, parent)) else: # The parent revision was never seen before. We should # walk its history and associate it with the same # relative revision. provenance.revision_add_before_revision(relative, parent) stack.append((relative, parent)) else: # The parent revision already points to an origin, so its # history was properly processed before. We just need to # make sure it points to the current origin as well. provenance.revision_add_to_origin(origin, parent) def revision_add( provenance: ProvenanceInterface, archive: ArchiveInterface, revisions: List[RevisionEntry], trackall: bool = True, lower: bool = True, mindepth: int = 1, ) -> None: start = time.time() for revision in revisions: assert revision.date is not None assert revision.root is not None # Processed content starting from the revision's root directory. date = provenance.revision_get_early_date(revision) if date is None or revision.date < date: logging.debug( f"Processing revisions {hash_to_hex(revision.id)}" f" (known date {date} / revision date {revision.date})..." ) graph = build_isochrone_graph( archive, provenance, revision, DirectoryEntry(revision.root), ) # TODO: add file size filtering revision_process_content( archive, provenance, revision, graph, trackall=trackall, lower=lower, mindepth=mindepth, ) done = time.time() # TODO: improve this! Maybe using a max attempt counter? # Ideally Provenance class should guarantee that a commit never fails. while not provenance.commit(): logging.warning( "Could not commit revisions " + ";".join([hash_to_hex(revision.id) for revision in revisions]) + ". Retrying..." ) stop = time.time() logging.debug( f"Revisions {';'.join([hash_to_hex(revision.id) for revision in revisions])} " f" were processed in {stop - start} secs (commit took {stop - done} secs)!" ) # logging.critical( # ";".join([hash_to_hex(revision.id) for revision in revisions]) # + f",{stop - start},{stop - done}" # ) class IsochroneNode: def __init__( self, - entry: DirectoryEntry, + entry: Union[DirectoryEntry, FileEntry], date: Optional[datetime] = None, depth: int = 0, prefix: bytes = b"", ): self.entry = entry self.depth = depth self.date = date self.known = self.date is not None self.children: List[IsochroneNode] = [] self.maxdate: Optional[datetime] = None self.path = os.path.join(prefix, self.entry.name) if prefix else self.entry.name def add_child( - self, child: DirectoryEntry, date: Optional[datetime] = None + self, child: Union[FileEntry, DirectoryEntry], date: Optional[datetime] = None ) -> "IsochroneNode": assert isinstance(self.entry, DirectoryEntry) and self.date is None node = IsochroneNode( child, date=date, depth=self.depth + 1, prefix=self.path, ) self.children.append(node) return node def build_isochrone_graph( archive: ArchiveInterface, provenance: ProvenanceInterface, revision: RevisionEntry, directory: DirectoryEntry, ) -> IsochroneNode: assert revision.date is not None assert revision.root == directory.id # Build the nodes structure root_date = provenance.directory_get_date_in_isochrone_frontier(directory) root = IsochroneNode(directory, date=root_date) stack = [root] logging.debug( f"Recursively creating graph for revision {hash_to_hex(revision.id)}..." ) while stack: current = stack.pop() assert isinstance(current.entry, DirectoryEntry) if current.date is None or current.date > revision.date: # If current directory has an associated date in the isochrone frontier that # is greater or equal to the current revision's one, it should be ignored as # the revision is being processed out of order. if current.date is not None and current.date > revision.date: logging.debug( f"Invalidating frontier on {hash_to_hex(current.entry.id)}" f" (date {current.date})" f" when processing revision {hash_to_hex(revision.id)}" f" (date {revision.date})" ) provenance.directory_invalidate_in_isochrone_frontier(current.entry) current.date = None current.known = False # Pre-query all known dates for content/directories in the current directory # for the provenance object to have them cached and (potentially) improve # performance. + current.entry.retrieve_children(archive) ddates = provenance.directory_get_dates_in_isochrone_frontier( - [ - child - for child in current.entry.ls(archive) - if isinstance(child, DirectoryEntry) - ] + current.entry.dirs ) - fdates = provenance.content_get_early_dates( - [ - child - for child in current.entry.ls(archive) - if isinstance(child, FileEntry) - ] - ) - for child in current.entry.ls(archive): + fdates = provenance.content_get_early_dates(current.entry.files) + for d_child in current.entry.dirs: # Recursively analyse directory nodes. - if isinstance(child, DirectoryEntry): - node = current.add_child(child, date=ddates.get(child.id)) - stack.append(node) - else: - # WARNING: there is a type checking issue here! - current.add_child(child, date=fdates.get(child.id)) + node = current.add_child(d_child, date=ddates.get(d_child.id)) + stack.append(node) + for f_child in current.entry.files: + # WARNING: there is a type checking issue here! + current.add_child(f_child, date=fdates.get(f_child.id)) logging.debug( f"Isochrone graph for revision {hash_to_hex(revision.id)} successfully created!" ) # Precalculate max known date for each node in the graph (only directory nodes are # pushed to the stack). stack = [root] logging.debug(f"Computing maxdates for revision {hash_to_hex(revision.id)}...") while stack: current = stack.pop() # Current directory node is known if it already has an assigned date (ie. it was # already seen as an isochrone frontier). if not current.known: if any(map(lambda child: child.maxdate is None, current.children)): # Current node needs to be analysed again after its children. stack.append(current) for child in current.children: if isinstance(child.entry, FileEntry): # A file node is known if it already has an assigned date (ie. # is was processed before) if child.known: assert child.date is not None # Just use its known date. child.maxdate = child.date else: # Use current revision date. child.maxdate = revision.date else: stack.append(child) else: maxdates = [ child.maxdate for child in current.children if child.maxdate is not None # mostly to please mypy ] current.maxdate = max(maxdates) if maxdates else UTCMIN # If all content is already known, update current directory info. current.known = all(map(lambda child: child.known, current.children)) else: # Directory node in the frontier, just use its known date. current.maxdate = current.date logging.debug( f"Maxdates for revision {hash_to_hex(revision.id)} successfully computed!" ) return root def revision_process_content( archive: ArchiveInterface, provenance: ProvenanceInterface, revision: RevisionEntry, graph: IsochroneNode, trackall: bool = True, lower: bool = True, mindepth: int = 1, ): assert revision.date is not None provenance.revision_add(revision) stack = [graph] while stack: current = stack.pop() assert isinstance(current.entry, DirectoryEntry) if current.date is not None: assert current.date <= revision.date if trackall: # Current directory is an outer isochrone frontier for a previously # processed revision. It should be reused as is. provenance.directory_add_to_revision( revision, current.entry, current.path ) else: # Current directory is not an outer isochrone frontier for any previous # revision. It might be eligible for this one. if is_new_frontier( current, revision, trackall=trackall, lower=lower, mindepth=mindepth ): assert current.maxdate is not None # Outer frontier should be moved to current position in the isochrone # graph. This is the first time this directory is found in the isochrone # frontier. provenance.directory_set_date_in_isochrone_frontier( current.entry, current.maxdate ) if trackall: provenance.directory_add_to_revision( revision, current.entry, current.path ) flatten_directory(archive, provenance, current.entry) else: # No point moving the frontier here. Either there are no files or they # are being seen for the first time here. Add all blobs to current # revision updating date if necessary, and recursively analyse # subdirectories as candidates to the outer frontier. for child in current.children: if isinstance(child.entry, FileEntry): blob = child.entry if child.date is None or revision.date < child.date: provenance.content_set_early_date(blob, revision.date) provenance.content_add_to_revision(revision, blob, current.path) else: stack.append(child) def is_new_frontier( node: IsochroneNode, revision: RevisionEntry, trackall: bool = True, lower: bool = True, mindepth: int = 1, ) -> bool: assert node.maxdate is not None and revision.date is not None if trackall: # The only real condition for a directory to be a frontier is that its content # is already known and its maxdate is less (or equal) than current revision's # date. Checking mindepth is meant to skip root directories (or any arbitrary # depth) to improve the result. The option lower tries to maximize the reusage # rate of previously defined frontiers by keeping them low in the directory # tree. return ( node.known # all content in node was already seen before and node.maxdate <= revision.date # all content is earlier than revision and node.depth >= mindepth # deeper than the min allowed depth and (has_blobs(node) if lower else True) # there is at least one blob ) else: # If we are only tracking first occurrences, we want to ensure that all first # occurrences end up in the content_early_in_rev relation. Thus, we force for # every blob outside a frontier to have an extrictly earlier date. return ( node.maxdate < revision.date # all content is earlier than revision and node.depth >= mindepth # deeper than the min allowed depth and (has_blobs(node) if lower else True) # there is at least one blob ) def has_blobs(node: IsochroneNode) -> bool: # We may want to look for files in different ways to decide whether to define a # frontier or not: # 1. Only files in current node: return any(map(lambda child: isinstance(child.entry, FileEntry), node.children)) # 2. Files anywhere in the isochrone graph # stack = [node] # while stack: # current = stack.pop() # if any( # map(lambda child: isinstance(child.entry, FileEntry), current.children)): # return True # else: # # All children are directory entries. # stack.extend(current.children) # return False # 3. Files in the intermediate directories between current node and any previously # defined frontier: # TODO: complete this case! # return any( # map(lambda child: isinstance(child.entry, FileEntry), node.children) # ) or all( # map( # lambda child: ( # not (isinstance(child.entry, DirectoryEntry) and child.date is None) # ) # or has_blobs(child), # node.children, # ) # )