diff --git a/swh/provenance/cli.py b/swh/provenance/cli.py --- a/swh/provenance/cli.py +++ b/swh/provenance/cli.py @@ -129,8 +129,7 @@ # TODO: add file size filtering """Process a provided list of revisions.""" from . import get_archive, get_provenance - from .provenance import revision_add - from .revision import CSVRevisionIterator + from .revision import CSVRevisionIterator, revision_add archive = get_archive(**ctx.obj["config"]["archive"]) provenance = get_provenance(**ctx.obj["config"]["provenance"]) @@ -157,8 +156,7 @@ def iter_origins(ctx, filename, limit): """Process a provided list of origins.""" from . import get_archive, get_provenance - from .origin import CSVOriginIterator - from .provenance import origin_add + from .origin import CSVOriginIterator, origin_add archive = get_archive(**ctx.obj["config"]["archive"]) provenance = get_provenance(**ctx.obj["config"]["provenance"]) @@ -200,7 +198,7 @@ @click.pass_context def find_all(ctx, swhid, limit): """Find all occurrences of the requested blob.""" - from swh.provenance import get_provenance + from . import get_provenance provenance = get_provenance(**ctx.obj["config"]["provenance"]) # TODO: return a dictionary with proper keys for each field diff --git a/swh/provenance/graph.py b/swh/provenance/graph.py new file mode 100644 --- /dev/null +++ b/swh/provenance/graph.py @@ -0,0 +1,223 @@ +from collections import Counter +from datetime import datetime, timezone +import logging +import os +from typing import Dict, List, Optional + +from swh.model.hashutil import hash_to_hex + +from .archive import ArchiveInterface +from .model import DirectoryEntry, RevisionEntry +from .provenance import ProvenanceInterface + +UTCMIN = datetime.min.replace(tzinfo=timezone.utc) + + +class IsochroneNode: + def __init__( + self, + entry: DirectoryEntry, + dbdate: Optional[datetime] = None, + depth: int = 0, + prefix: bytes = b"", + ): + self.entry = entry + self.depth = depth + + # dbdate is the maxdate for this node that comes from the DB + self._dbdate: Optional[datetime] = dbdate + + # maxdate is set by the maxdate computation algorithm + self.maxdate: Optional[datetime] = None + + # known is True if this node is already known in the db; either because + # the current directory actually exists in the database, or because all + # the content of the current directory is known (subdirectories and files) + self.known = self.dbdate is not None + self.invalid = False + self.path = os.path.join(prefix, self.entry.name) if prefix else self.entry.name + self.children: List[IsochroneNode] = [] + + @property + def dbdate(self): + # use a property to make this attribute (mostly) read-only + return self._dbdate + + def invalidate(self): + self._dbdate = None + self.maxdate = None + self.known = False + self.invalid = True + + def add_directory( + self, child: DirectoryEntry, date: Optional[datetime] = None + ) -> "IsochroneNode": + # we should not be processing this node (ie add subdirectories or + # files) if it's actually known by the provenance DB + assert self.dbdate is None + node = IsochroneNode(child, dbdate=date, depth=self.depth + 1, prefix=self.path) + self.children.append(node) + return node + + def __str__(self): + return ( + f"<{self.entry}: dbdate={self.dbdate}, maxdate={self.maxdate}, " + f"known={self.known}, invalid={self.invalid}, path={self.path}, " + f"children=[{', '.join(str(child) for child in self.children)}]>" + ) + + def __eq__(self, other): + return ( + isinstance(other, IsochroneNode) + and ( + self.entry, + self.depth, + self._dbdate, + self.maxdate, + self.known, + self.invalid, + self.path, + ) + == ( + other.entry, + other.depth, + other._dbdate, + other.maxdate, + other.known, + other.invalid, + other.path, + ) + and Counter(self.children) == Counter(other.children) + ) + + def __hash__(self): + return hash( + ( + self.entry, + self.depth, + self._dbdate, + self.maxdate, + self.known, + self.invalid, + self.path, + ) + ) + + +def build_isochrone_graph( + archive: ArchiveInterface, + provenance: ProvenanceInterface, + revision: RevisionEntry, + directory: DirectoryEntry, +) -> IsochroneNode: + assert revision.date is not None + assert revision.root == directory.id + + # this function process a revision in 2 steps: + # + # 1. build the tree structure of IsochroneNode objects (one INode per + # directory under the root directory of the revision but not following + # known subdirectories), and gather the dates from the DB for already + # known objects; for files, just keep all the dates in a global 'fdates' + # dict; note that in this step, we will only recurse the directories + # that are not already known. + # + # 2. compute the maxdate for each node of the tree that was not found in the DB. + + # Build the nodes structure + root_date = provenance.directory_get_date_in_isochrone_frontier(directory) + root = IsochroneNode(directory, dbdate=root_date) + stack = [root] + logging.debug( + f"Recursively creating graph for revision {hash_to_hex(revision.id)}..." + ) + fdates: Dict[bytes, datetime] = {} # map {file_id: date} + while stack: + current = stack.pop() + if current.dbdate is None or current.dbdate > revision.date: + # If current directory has an associated date in the isochrone frontier that + # is greater or equal to the current revision's one, it should be ignored as + # the revision is being processed out of order. + if current.dbdate is not None and current.dbdate > revision.date: + logging.debug( + f"Invalidating frontier on {hash_to_hex(current.entry.id)}" + f" (date {current.dbdate})" + f" when processing revision {hash_to_hex(revision.id)}" + f" (date {revision.date})" + ) + current.invalidate() + + # Pre-query all known dates for directories in the current directory + # for the provenance object to have them cached and (potentially) improve + # performance. + current.entry.retrieve_children(archive) + ddates = provenance.directory_get_dates_in_isochrone_frontier( + current.entry.dirs + ) + for dir in current.entry.dirs: + # Recursively analyse subdirectory nodes + node = current.add_directory(dir, date=ddates.get(dir.id, None)) + stack.append(node) + + fdates.update(provenance.content_get_early_dates(current.entry.files)) + + logging.debug( + f"Isochrone graph for revision {hash_to_hex(revision.id)} successfully created!" + ) + # Precalculate max known date for each node in the graph (only directory nodes are + # pushed to the stack). + logging.debug(f"Computing maxdates for revision {hash_to_hex(revision.id)}...") + stack = [root] + + while stack: + current = stack.pop() + # Current directory node is known if it already has an assigned date (ie. it was + # already seen as an isochrone frontier). + if current.known: + assert current.maxdate is None + current.maxdate = current.dbdate + else: + if any(x.maxdate is None for x in current.children): + # at least one child of current has no maxdate yet + # Current node needs to be analysed again after its children. + stack.append(current) + for child in current.children: + if child.maxdate is None: + # if child.maxdate is None, it must be processed + stack.append(child) + else: + # all the files and directories under current have a maxdate, + # we can infer the maxdate for current directory + assert current.maxdate is None + # if all content is already known, update current directory info. + current.maxdate = max( + [UTCMIN] + + [ + child.maxdate + for child in current.children + if child.maxdate is not None # unnecessary, but needed for mypy + ] + + [ + fdates.get(file.id, revision.date) + for file in current.entry.files + ] + ) + if current.maxdate <= revision.date: + current.known = ( + # true if all subdirectories are known + all(child.known for child in current.children) + # true if all files are in fdates, i.e. if all files were known + # *before building this isochrone graph node* + # Note: the 'all()' is lazy: will stop iterating as soon as + # possible + and all((file.id in fdates) for file in current.entry.files) + ) + else: + # at least one content is being processed out-of-order, then current + # node should be treated as unknown + current.maxdate = revision.date + current.known = False + logging.debug( + f"Maxdates for revision {hash_to_hex(revision.id)} successfully computed!" + ) + return root diff --git a/swh/provenance/origin.py b/swh/provenance/origin.py --- a/swh/provenance/origin.py +++ b/swh/provenance/origin.py @@ -1,13 +1,16 @@ from datetime import datetime, timezone from itertools import islice -from typing import Iterable, Iterator, Optional, Tuple +import logging +import time +from typing import Iterable, Iterator, List, Optional, Tuple import iso8601 -from .model import OriginEntry +from swh.model.hashutil import hash_to_hex -################################################################################ -################################################################################ +from .archive import ArchiveInterface +from .model import OriginEntry, RevisionEntry +from .provenance import ProvenanceInterface class CSVOriginIterator: @@ -39,3 +42,81 @@ for url, date, snap in self.statuses: date = iso8601.parse_date(date, default_timezone=timezone.utc) yield OriginEntry(url, date, snap) + + +def origin_add( + provenance: ProvenanceInterface, + archive: ArchiveInterface, + origins: List[OriginEntry], +) -> None: + start = time.time() + for origin in origins: + origin.retrieve_revisions(archive) + for revision in origin.revisions: + origin_add_revision(provenance, archive, origin, revision) + done = time.time() + provenance.commit() + stop = time.time() + logging.debug( + "Origins " + ";".join( + [origin.url + ":" + hash_to_hex(origin.snapshot) for origin in origins] + ) + + f" were processed in {stop - start} secs (commit took {stop - done} secs)!" + ) + + +def origin_add_revision( + provenance: ProvenanceInterface, + archive: ArchiveInterface, + origin: OriginEntry, + revision: RevisionEntry, +) -> None: + stack: List[Tuple[Optional[RevisionEntry], RevisionEntry]] = [(None, revision)] + origin.id = provenance.origin_get_id(origin) + + while stack: + relative, current = stack.pop() + + # Check if current revision has no preferred origin and update if necessary. + preferred = provenance.revision_get_preferred_origin(current) + + if preferred is None: + provenance.revision_set_preferred_origin(origin, current) + ######################################################################## + + if relative is None: + # This revision is pointed directly by the origin. + visited = provenance.revision_visited(current) + provenance.revision_add_to_origin(origin, current) + + if not visited: + stack.append((current, current)) + + else: + # This revision is a parent of another one in the history of the + # relative revision. + for parent in current.parents(archive): + visited = provenance.revision_visited(parent) + + if not visited: + # The parent revision has never been seen before pointing + # directly to an origin. + known = provenance.revision_in_history(parent) + + if known: + # The parent revision is already known in some other + # revision's history. We should point it directly to + # the origin and (eventually) walk its history. + stack.append((None, parent)) + else: + # The parent revision was never seen before. We should + # walk its history and associate it with the same + # relative revision. + provenance.revision_add_before_revision(relative, parent) + stack.append((relative, parent)) + else: + # The parent revision already points to an origin, so its + # history was properly processed before. We just need to + # make sure it points to the current origin as well. + provenance.revision_add_to_origin(origin, parent) diff --git a/swh/provenance/provenance.py b/swh/provenance/provenance.py --- a/swh/provenance/provenance.py +++ b/swh/provenance/provenance.py @@ -1,20 +1,13 @@ -from collections import Counter -from datetime import datetime, timezone +from datetime import datetime import logging import os -import time from typing import Any, Dict, Generator, Iterable, List, Optional, Tuple import psycopg2 from typing_extensions import Protocol, runtime_checkable -from swh.model.hashutil import hash_to_hex - -from .archive import ArchiveInterface from .model import DirectoryEntry, FileEntry, OriginEntry, RevisionEntry -UTCMIN = datetime.min.replace(tzinfo=timezone.utc) - # XXX: this protocol doesn't make much sense now that flavours have been delegated to # another class, lower in the callstack. @@ -275,492 +268,3 @@ def normalize(path: bytes) -> bytes: return path[2:] if path.startswith(bytes("." + os.path.sep, "utf-8")) else path - - -def flatten_directory( - archive: ArchiveInterface, - provenance: ProvenanceInterface, - directory: DirectoryEntry, -) -> None: - """Recursively retrieve all the files of 'directory' and insert them in the - 'provenance' database in the 'content_to_directory' table. - """ - stack = [(directory, b"")] - while stack: - current, prefix = stack.pop() - current.retrieve_children(archive) - for f_child in current.files: - # Add content to the directory with the computed prefix. - provenance.content_add_to_directory(directory, f_child, prefix) - for d_child in current.dirs: - # Recursively walk the child directory. - stack.append((d_child, os.path.join(prefix, d_child.name))) - - -def origin_add( - provenance: ProvenanceInterface, - archive: ArchiveInterface, - origins: List[OriginEntry], -) -> None: - start = time.time() - for origin in origins: - origin.retrieve_revisions(archive) - for revision in origin.revisions: - origin_add_revision(provenance, archive, origin, revision) - done = time.time() - provenance.commit() - stop = time.time() - logging.debug( - "Origins " - ";".join( - [origin.url + ":" + hash_to_hex(origin.snapshot) for origin in origins] - ) - + f" were processed in {stop - start} secs (commit took {stop - done} secs)!" - ) - - -def origin_add_revision( - provenance: ProvenanceInterface, - archive: ArchiveInterface, - origin: OriginEntry, - revision: RevisionEntry, -) -> None: - stack: List[Tuple[Optional[RevisionEntry], RevisionEntry]] = [(None, revision)] - origin.id = provenance.origin_get_id(origin) - - while stack: - relative, current = stack.pop() - - # Check if current revision has no preferred origin and update if necessary. - preferred = provenance.revision_get_preferred_origin(current) - - if preferred is None: - provenance.revision_set_preferred_origin(origin, current) - ######################################################################## - - if relative is None: - # This revision is pointed directly by the origin. - visited = provenance.revision_visited(current) - provenance.revision_add_to_origin(origin, current) - - if not visited: - stack.append((current, current)) - - else: - # This revision is a parent of another one in the history of the - # relative revision. - for parent in current.parents(archive): - visited = provenance.revision_visited(parent) - - if not visited: - # The parent revision has never been seen before pointing - # directly to an origin. - known = provenance.revision_in_history(parent) - - if known: - # The parent revision is already known in some other - # revision's history. We should point it directly to - # the origin and (eventually) walk its history. - stack.append((None, parent)) - else: - # The parent revision was never seen before. We should - # walk its history and associate it with the same - # relative revision. - provenance.revision_add_before_revision(relative, parent) - stack.append((relative, parent)) - else: - # The parent revision already points to an origin, so its - # history was properly processed before. We just need to - # make sure it points to the current origin as well. - provenance.revision_add_to_origin(origin, parent) - - -def revision_add( - provenance: ProvenanceInterface, - archive: ArchiveInterface, - revisions: List[RevisionEntry], - trackall: bool = True, - lower: bool = True, - mindepth: int = 1, - commit: bool = True, -) -> None: - start = time.time() - for revision in revisions: - assert revision.date is not None - assert revision.root is not None - # Processed content starting from the revision's root directory. - date = provenance.revision_get_early_date(revision) - if date is None or revision.date < date: - logging.debug( - f"Processing revisions {hash_to_hex(revision.id)}" - f" (known date {date} / revision date {revision.date})..." - ) - graph = build_isochrone_graph( - archive, - provenance, - revision, - DirectoryEntry(revision.root), - ) - # TODO: add file size filtering - revision_process_content( - archive, - provenance, - revision, - graph, - trackall=trackall, - lower=lower, - mindepth=mindepth, - ) - done = time.time() - if commit: - provenance.commit() - stop = time.time() - logging.debug( - f"Revisions {';'.join([hash_to_hex(revision.id) for revision in revisions])} " - f" were processed in {stop - start} secs (commit took {stop - done} secs)!" - ) - # logging.critical( - # ";".join([hash_to_hex(revision.id) for revision in revisions]) - # + f",{stop - start},{stop - done}" - # ) - - -class IsochroneNode: - def __init__( - self, - entry: DirectoryEntry, - dbdate: Optional[datetime] = None, - depth: int = 0, - prefix: bytes = b"", - ): - self.entry = entry - self.depth = depth - - # dbdate is the maxdate for this node that comes from the DB - self._dbdate: Optional[datetime] = dbdate - - # maxdate is set by the maxdate computation algorithm - self.maxdate: Optional[datetime] = None - - # known is True if this node is already known in the db; either because - # the current directory actually exists in the database, or because all - # the content of the current directory is known (subdirectories and files) - self.known = self.dbdate is not None - self.invalid = False - self.path = os.path.join(prefix, self.entry.name) if prefix else self.entry.name - self.children: List[IsochroneNode] = [] - - @property - def dbdate(self): - # use a property to make this attribute (mostly) read-only - return self._dbdate - - def invalidate(self): - self._dbdate = None - self.maxdate = None - self.known = False - self.invalid = True - - def add_directory( - self, child: DirectoryEntry, date: Optional[datetime] = None - ) -> "IsochroneNode": - # we should not be processing this node (ie add subdirectories or - # files) if it's actually known by the provenance DB - assert self.dbdate is None - node = IsochroneNode(child, dbdate=date, depth=self.depth + 1, prefix=self.path) - self.children.append(node) - return node - - def __str__(self): - return ( - f"<{self.entry}: dbdate={self.dbdate}, maxdate={self.maxdate}, " - f"known={self.known}, invalid={self.invalid}, path={self.path}, " - f"children=[{', '.join(str(child) for child in self.children)}]>" - ) - - def __eq__(self, other): - return ( - isinstance(other, IsochroneNode) - and ( - self.entry, - self.depth, - self._dbdate, - self.maxdate, - self.known, - self.invalid, - self.path, - ) - == ( - other.entry, - other.depth, - other._dbdate, - other.maxdate, - other.known, - other.invalid, - other.path, - ) - and Counter(self.children) == Counter(other.children) - ) - - def __hash__(self): - return hash( - ( - self.entry, - self.depth, - self._dbdate, - self.maxdate, - self.known, - self.invalid, - self.path, - ) - ) - - -def build_isochrone_graph( - archive: ArchiveInterface, - provenance: ProvenanceInterface, - revision: RevisionEntry, - directory: DirectoryEntry, -) -> IsochroneNode: - assert revision.date is not None - assert revision.root == directory.id - - # this function process a revision in 2 steps: - # - # 1. build the tree structure of IsochroneNode objects (one INode per - # directory under the root directory of the revision but not following - # known subdirectories), and gather the dates from the DB for already - # known objects; for files, just keep all the dates in a global 'fdates' - # dict; note that in this step, we will only recurse the directories - # that are not already known. - # - # 2. compute the maxdate for each node of the tree that was not found in the DB. - - # Build the nodes structure - root_date = provenance.directory_get_date_in_isochrone_frontier(directory) - root = IsochroneNode(directory, dbdate=root_date) - stack = [root] - logging.debug( - f"Recursively creating graph for revision {hash_to_hex(revision.id)}..." - ) - fdates: Dict[bytes, datetime] = {} # map {file_id: date} - while stack: - current = stack.pop() - if current.dbdate is None or current.dbdate > revision.date: - # If current directory has an associated date in the isochrone frontier that - # is greater or equal to the current revision's one, it should be ignored as - # the revision is being processed out of order. - if current.dbdate is not None and current.dbdate > revision.date: - logging.debug( - f"Invalidating frontier on {hash_to_hex(current.entry.id)}" - f" (date {current.dbdate})" - f" when processing revision {hash_to_hex(revision.id)}" - f" (date {revision.date})" - ) - current.invalidate() - - # Pre-query all known dates for directories in the current directory - # for the provenance object to have them cached and (potentially) improve - # performance. - current.entry.retrieve_children(archive) - ddates = provenance.directory_get_dates_in_isochrone_frontier( - current.entry.dirs - ) - for dir in current.entry.dirs: - # Recursively analyse subdirectory nodes - node = current.add_directory(dir, date=ddates.get(dir.id, None)) - stack.append(node) - - fdates.update(provenance.content_get_early_dates(current.entry.files)) - - logging.debug( - f"Isochrone graph for revision {hash_to_hex(revision.id)} successfully created!" - ) - # Precalculate max known date for each node in the graph (only directory nodes are - # pushed to the stack). - logging.debug(f"Computing maxdates for revision {hash_to_hex(revision.id)}...") - stack = [root] - - while stack: - current = stack.pop() - # Current directory node is known if it already has an assigned date (ie. it was - # already seen as an isochrone frontier). - if current.known: - assert current.maxdate is None - current.maxdate = current.dbdate - else: - if any(x.maxdate is None for x in current.children): - # at least one child of current has no maxdate yet - # Current node needs to be analysed again after its children. - stack.append(current) - for child in current.children: - if child.maxdate is None: - # if child.maxdate is None, it must be processed - stack.append(child) - else: - # all the files and directories under current have a maxdate, - # we can infer the maxdate for current directory - assert current.maxdate is None - # if all content is already known, update current directory info. - current.maxdate = max( - [UTCMIN] - + [ - child.maxdate - for child in current.children - if child.maxdate is not None # unnecessary, but needed for mypy - ] - + [ - fdates.get(file.id, revision.date) - for file in current.entry.files - ] - ) - if current.maxdate <= revision.date: - current.known = ( - # true if all subdirectories are known - all(child.known for child in current.children) - # true if all files are in fdates, i.e. if all files were known - # *before building this isochrone graph node* - # Note: the 'all()' is lazy: will stop iterating as soon as - # possible - and all((file.id in fdates) for file in current.entry.files) - ) - else: - # at least one content is being processed out-of-order, then current - # node should be treated as unknown - current.maxdate = revision.date - current.known = False - logging.debug( - f"Maxdates for revision {hash_to_hex(revision.id)} successfully computed!" - ) - return root - - -def revision_process_content( - archive: ArchiveInterface, - provenance: ProvenanceInterface, - revision: RevisionEntry, - graph: IsochroneNode, - trackall: bool = True, - lower: bool = True, - mindepth: int = 1, -): - assert revision.date is not None - provenance.revision_add(revision) - - stack = [graph] - while stack: - current = stack.pop() - if current.dbdate is not None: - assert current.dbdate <= revision.date - if trackall: - # Current directory is an outer isochrone frontier for a previously - # processed revision. It should be reused as is. - provenance.directory_add_to_revision( - revision, current.entry, current.path - ) - else: - assert current.maxdate is not None - # Current directory is not an outer isochrone frontier for any previous - # revision. It might be eligible for this one. - if is_new_frontier( - current, - revision=revision, - trackall=trackall, - lower=lower, - mindepth=mindepth, - ): - # Outer frontier should be moved to current position in the isochrone - # graph. This is the first time this directory is found in the isochrone - # frontier. - provenance.directory_set_date_in_isochrone_frontier( - current.entry, current.maxdate - ) - if trackall: - provenance.directory_add_to_revision( - revision, current.entry, current.path - ) - flatten_directory(archive, provenance, current.entry) - else: - # If current node is an invalidated frontier, update its date for future - # revisions to get the proper value. - if current.invalid: - provenance.directory_set_date_in_isochrone_frontier( - current.entry, current.maxdate - ) - # No point moving the frontier here. Either there are no files or they - # are being seen for the first time here. Add all blobs to current - # revision updating date if necessary, and recursively analyse - # subdirectories as candidates to the outer frontier. - for blob in current.entry.files: - date = provenance.content_get_early_date(blob) - if date is None or revision.date < date: - provenance.content_set_early_date(blob, revision.date) - provenance.content_add_to_revision(revision, blob, current.path) - for child in current.children: - stack.append(child) - - -def is_new_frontier( - node: IsochroneNode, - revision: RevisionEntry, - trackall: bool = True, - lower: bool = True, - mindepth: int = 1, -) -> bool: - assert node.maxdate is not None # for mypy - assert revision.date is not None # idem - if trackall: - # The only real condition for a directory to be a frontier is that its - # content is already known and its maxdate is less (or equal) than - # current revision's date. Checking mindepth is meant to skip root - # directories (or any arbitrary depth) to improve the result. The - # option lower tries to maximize the reusage rate of previously defined - # frontiers by keeping them low in the directory tree. - return ( - node.known - and node.maxdate <= revision.date # all content is earlier than revision - and node.depth - >= mindepth # current node is deeper than the min allowed depth - and (has_blobs(node) if lower else True) # there is at least one blob in it - ) - else: - # If we are only tracking first occurrences, we want to ensure that all first - # occurrences end up in the content_early_in_rev relation. Thus, we force for - # every blob outside a frontier to have an extrictly earlier date. - return ( - node.maxdate < revision.date # all content is earlier than revision - and node.depth >= mindepth # deeper than the min allowed depth - and (has_blobs(node) if lower else True) # there is at least one blob - ) - - -def has_blobs(node: IsochroneNode) -> bool: - # We may want to look for files in different ways to decide whether to define a - # frontier or not: - # 1. Only files in current node: - return any(node.entry.files) - # 2. Files anywhere in the isochrone graph - # stack = [node] - # while stack: - # current = stack.pop() - # if any( - # map(lambda child: isinstance(child.entry, FileEntry), current.children)): - # return True - # else: - # # All children are directory entries. - # stack.extend(current.children) - # return False - # 3. Files in the intermediate directories between current node and any previously - # defined frontier: - # TODO: complete this case! - # return any( - # map(lambda child: isinstance(child.entry, FileEntry), node.children) - # ) or all( - # map( - # lambda child: ( - # not (isinstance(child.entry, DirectoryEntry) and child.date is None) - # ) - # or has_blobs(child), - # node.children, - # ) - # ) diff --git a/swh/provenance/revision.py b/swh/provenance/revision.py --- a/swh/provenance/revision.py +++ b/swh/provenance/revision.py @@ -1,14 +1,18 @@ from datetime import datetime, timezone from itertools import islice -from typing import Iterable, Iterator, Optional, Tuple +import logging +import os +import time +from typing import Iterable, Iterator, List, Optional, Tuple import iso8601 -from swh.model.hashutil import hash_to_bytes -from swh.provenance.model import RevisionEntry +from swh.model.hashutil import hash_to_bytes, hash_to_hex -######################################################################################## -######################################################################################## +from .archive import ArchiveInterface +from .graph import IsochroneNode, build_isochrone_graph +from .model import DirectoryEntry, RevisionEntry +from .provenance import ProvenanceInterface class CSVRevisionIterator: @@ -48,3 +52,204 @@ date=date, root=hash_to_bytes(root), ) + + +def revision_add( + provenance: ProvenanceInterface, + archive: ArchiveInterface, + revisions: List[RevisionEntry], + trackall: bool = True, + lower: bool = True, + mindepth: int = 1, + commit: bool = True, +) -> None: + start = time.time() + for revision in revisions: + assert revision.date is not None + assert revision.root is not None + # Processed content starting from the revision's root directory. + date = provenance.revision_get_early_date(revision) + if date is None or revision.date < date: + logging.debug( + f"Processing revisions {hash_to_hex(revision.id)}" + f" (known date {date} / revision date {revision.date})..." + ) + graph = build_isochrone_graph( + archive, + provenance, + revision, + DirectoryEntry(revision.root), + ) + # TODO: add file size filtering + revision_process_content( + archive, + provenance, + revision, + graph, + trackall=trackall, + lower=lower, + mindepth=mindepth, + ) + done = time.time() + if commit: + provenance.commit() + stop = time.time() + logging.debug( + f"Revisions {';'.join([hash_to_hex(revision.id) for revision in revisions])} " + f" were processed in {stop - start} secs (commit took {stop - done} secs)!" + ) + # logging.critical( + # ";".join([hash_to_hex(revision.id) for revision in revisions]) + # + f",{stop - start},{stop - done}" + # ) + + +def revision_process_content( + archive: ArchiveInterface, + provenance: ProvenanceInterface, + revision: RevisionEntry, + graph: IsochroneNode, + trackall: bool = True, + lower: bool = True, + mindepth: int = 1, +): + assert revision.date is not None + provenance.revision_add(revision) + + stack = [graph] + while stack: + current = stack.pop() + if current.dbdate is not None: + assert current.dbdate <= revision.date + if trackall: + # Current directory is an outer isochrone frontier for a previously + # processed revision. It should be reused as is. + provenance.directory_add_to_revision( + revision, current.entry, current.path + ) + else: + assert current.maxdate is not None + # Current directory is not an outer isochrone frontier for any previous + # revision. It might be eligible for this one. + if is_new_frontier( + current, + revision=revision, + trackall=trackall, + lower=lower, + mindepth=mindepth, + ): + # Outer frontier should be moved to current position in the isochrone + # graph. This is the first time this directory is found in the isochrone + # frontier. + provenance.directory_set_date_in_isochrone_frontier( + current.entry, current.maxdate + ) + if trackall: + provenance.directory_add_to_revision( + revision, current.entry, current.path + ) + flatten_directory(archive, provenance, current.entry) + else: + # If current node is an invalidated frontier, update its date for future + # revisions to get the proper value. + if current.invalid: + provenance.directory_set_date_in_isochrone_frontier( + current.entry, current.maxdate + ) + # No point moving the frontier here. Either there are no files or they + # are being seen for the first time here. Add all blobs to current + # revision updating date if necessary, and recursively analyse + # subdirectories as candidates to the outer frontier. + for blob in current.entry.files: + date = provenance.content_get_early_date(blob) + if date is None or revision.date < date: + provenance.content_set_early_date(blob, revision.date) + provenance.content_add_to_revision(revision, blob, current.path) + for child in current.children: + stack.append(child) + + +def flatten_directory( + archive: ArchiveInterface, + provenance: ProvenanceInterface, + directory: DirectoryEntry, +) -> None: + """Recursively retrieve all the files of 'directory' and insert them in the + 'provenance' database in the 'content_to_directory' table. + """ + stack = [(directory, b"")] + while stack: + current, prefix = stack.pop() + current.retrieve_children(archive) + for f_child in current.files: + # Add content to the directory with the computed prefix. + provenance.content_add_to_directory(directory, f_child, prefix) + for d_child in current.dirs: + # Recursively walk the child directory. + stack.append((d_child, os.path.join(prefix, d_child.name))) + + +def is_new_frontier( + node: IsochroneNode, + revision: RevisionEntry, + trackall: bool = True, + lower: bool = True, + mindepth: int = 1, +) -> bool: + assert node.maxdate is not None # for mypy + assert revision.date is not None # idem + if trackall: + # The only real condition for a directory to be a frontier is that its + # content is already known and its maxdate is less (or equal) than + # current revision's date. Checking mindepth is meant to skip root + # directories (or any arbitrary depth) to improve the result. The + # option lower tries to maximize the reusage rate of previously defined + # frontiers by keeping them low in the directory tree. + return ( + node.known + and node.maxdate <= revision.date # all content is earlier than revision + and node.depth + >= mindepth # current node is deeper than the min allowed depth + and (has_blobs(node) if lower else True) # there is at least one blob in it + ) + else: + # If we are only tracking first occurrences, we want to ensure that all first + # occurrences end up in the content_early_in_rev relation. Thus, we force for + # every blob outside a frontier to have an extrictly earlier date. + return ( + node.maxdate < revision.date # all content is earlier than revision + and node.depth >= mindepth # deeper than the min allowed depth + and (has_blobs(node) if lower else True) # there is at least one blob + ) + + +def has_blobs(node: IsochroneNode) -> bool: + # We may want to look for files in different ways to decide whether to define a + # frontier or not: + # 1. Only files in current node: + return any(node.entry.files) + # 2. Files anywhere in the isochrone graph + # stack = [node] + # while stack: + # current = stack.pop() + # if any( + # map(lambda child: isinstance(child.entry, FileEntry), current.children)): + # return True + # else: + # # All children are directory entries. + # stack.extend(current.children) + # return False + # 3. Files in the intermediate directories between current node and any previously + # defined frontier: + # TODO: complete this case! + # return any( + # map(lambda child: isinstance(child.entry, FileEntry), node.children) + # ) or all( + # map( + # lambda child: ( + # not (isinstance(child.entry, DirectoryEntry) and child.date is None) + # ) + # or has_blobs(child), + # node.children, + # ) + # ) diff --git a/swh/provenance/tests/test_isochrone_graph.py b/swh/provenance/tests/test_isochrone_graph.py --- a/swh/provenance/tests/test_isochrone_graph.py +++ b/swh/provenance/tests/test_isochrone_graph.py @@ -10,8 +10,9 @@ import yaml from swh.model.hashutil import hash_to_bytes +from swh.provenance.graph import IsochroneNode, build_isochrone_graph from swh.provenance.model import DirectoryEntry, RevisionEntry -from swh.provenance.provenance import IsochroneNode, build_isochrone_graph, revision_add +from swh.provenance.revision import revision_add from swh.provenance.tests.conftest import fill_storage, get_datafile, load_repo_data from swh.provenance.tests.test_provenance_db import ts2dt diff --git a/swh/provenance/tests/test_provenance_db.py b/swh/provenance/tests/test_provenance_db.py --- a/swh/provenance/tests/test_provenance_db.py +++ b/swh/provenance/tests/test_provenance_db.py @@ -6,8 +6,8 @@ import datetime from swh.model.tests.swh_model_data import TEST_OBJECTS -from swh.provenance.origin import OriginEntry -from swh.provenance.provenance import origin_add +from swh.provenance.model import OriginEntry +from swh.provenance.origin import origin_add from swh.provenance.storage.archive import ArchiveStorage diff --git a/swh/provenance/tests/test_provenance_heuristics.py b/swh/provenance/tests/test_provenance_heuristics.py --- a/swh/provenance/tests/test_provenance_heuristics.py +++ b/swh/provenance/tests/test_provenance_heuristics.py @@ -8,7 +8,7 @@ import pytest from swh.provenance.model import RevisionEntry -from swh.provenance.provenance import revision_add +from swh.provenance.revision import revision_add from swh.provenance.tests.conftest import ( fill_storage, get_datafile,