diff --git a/swh/provenance/cli.py b/swh/provenance/cli.py index 7da3d0c..83c8ea2 100644 --- a/swh/provenance/cli.py +++ b/swh/provenance/cli.py @@ -1,201 +1,203 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information # WARNING: do not import unnecessary things here to keep cli startup time under # control import os from typing import Any, Dict, Optional import click import yaml from swh.core import config from swh.core.cli import CONTEXT_SETTINGS from swh.core.cli import swh as swh_cli_group from swh.model.hashutil import hash_to_bytes, hash_to_hex # All generic config code should reside in swh.core.config CONFIG_ENVVAR = "SWH_CONFIG_FILE" DEFAULT_CONFIG_PATH = os.path.join(click.get_app_dir("swh"), "global.yml") DEFAULT_PATH = os.environ.get(CONFIG_ENVVAR, DEFAULT_CONFIG_PATH) DEFAULT_CONFIG: Dict[str, Any] = { "archive": { "cls": "api", "storage": { "cls": "remote", "url": "http://uffizi.internal.softwareheritage.org:5002", } # "cls": "direct", # "db": { # "host": "db.internal.softwareheritage.org", # "dbname": "softwareheritage", # "user": "guest" # } }, "provenance": {"cls": "local", "db": {"host": "localhost", "dbname": "provenance"}}, } CONFIG_FILE_HELP = f"""Configuration file: \b The CLI option or the environment variable will fail if invalid. CLI option is checked first. Then, environment variable {CONFIG_ENVVAR} is checked. Then, if cannot load the default path, a set of default values are used. Default config path is {DEFAULT_CONFIG_PATH}. Default config values are: \b {yaml.dump(DEFAULT_CONFIG)}""" PROVENANCE_HELP = f"""Software Heritage Scanner tools. {CONFIG_FILE_HELP}""" @swh_cli_group.group( name="provenance", context_settings=CONTEXT_SETTINGS, help=PROVENANCE_HELP ) @click.option( "-C", "--config-file", default=None, type=click.Path(exists=False, dir_okay=False, path_type=str), help="""YAML configuration file.""", ) @click.option( "-P", "--profile", default=None, type=click.Path(exists=False, dir_okay=False, path_type=str), help="""Enable profiling to specified file.""", ) @click.pass_context def cli(ctx, config_file: Optional[str], profile: str): if config_file is None and config.config_exists(DEFAULT_PATH): config_file = DEFAULT_PATH if config_file is None: conf = DEFAULT_CONFIG else: # read_raw_config do not fail on ENOENT if not config.config_exists(config_file): raise FileNotFoundError(config_file) conf = config.read_raw_config(config.config_basepath(config_file)) conf = config.merge_configs(DEFAULT_CONFIG, conf) ctx.ensure_object(dict) ctx.obj["config"] = conf if profile: import atexit import cProfile print("Profiling...") pr = cProfile.Profile() pr.enable() def exit(): pr.disable() pr.dump_stats(profile) atexit.register(exit) @cli.command(name="create", deprecated=True) @click.option("--maintenance-db", default=None) @click.option("--drop/--no-drop", "drop_db", default=False) @click.pass_context def create(ctx, maintenance_db, drop_db): """Deprecated, please use: swh db create provenance and swh db init provenance instead. """ @cli.command(name="iter-revisions") @click.argument("filename") @click.option("-l", "--limit", type=int) +@click.option("-m", "--min-depth", default=1, type=int) +@click.option("-r", "--reuse", default=True, type=bool) @click.pass_context -def iter_revisions(ctx, filename, limit): +def iter_revisions(ctx, filename, limit, min_depth, reuse): # TODO: add file size filtering """Process a provided list of revisions.""" from . import get_archive, get_provenance from .provenance import revision_add from .revision import CSVRevisionIterator archive = get_archive(**ctx.obj["config"]["archive"]) provenance = get_provenance(**ctx.obj["config"]["provenance"]) revisions_provider = ( line.strip().split(",") for line in open(filename, "r") if line.strip() ) revisions = CSVRevisionIterator(revisions_provider, archive, limit=limit) for revision in revisions: - revision_add(provenance, archive, revision) + revision_add(provenance, archive, revision, lower=reuse, mindepth=min_depth) @cli.command(name="iter-origins") @click.argument("filename") @click.option("-l", "--limit", type=int) @click.pass_context def iter_origins(ctx, filename, limit): """Process a provided list of origins.""" from . import get_archive, get_provenance from .origin import FileOriginIterator from .provenance import origin_add archive = get_archive(**ctx.obj["config"]["archive"]) provenance = get_provenance(**ctx.obj["config"]["provenance"]) for origin in FileOriginIterator(filename, archive, limit=limit): origin_add(provenance, origin) @cli.command(name="find-first") @click.argument("swhid") @click.pass_context def find_first(ctx, swhid): """Find first occurrence of the requested blob.""" from . import get_provenance provenance = get_provenance(**ctx.obj["config"]["provenance"]) # TODO: return a dictionary with proper keys for each field row = provenance.content_find_first(hash_to_bytes(swhid)) if row is not None: print( "swh:1:cnt:{cnt}, swh:1:rev:{rev}, {date}, {path}".format( cnt=hash_to_hex(row[0]), rev=hash_to_hex(row[1]), date=row[2], path=os.fsdecode(row[3]), ) ) else: print(f"Cannot find a content with the id {swhid}") @cli.command(name="find-all") @click.argument("swhid") @click.option("-l", "--limit", type=int) @click.pass_context def find_all(ctx, swhid, limit): """Find all occurrences of the requested blob.""" from swh.provenance import get_provenance provenance = get_provenance(**ctx.obj["config"]["provenance"]) # TODO: return a dictionary with proper keys for each field for row in provenance.content_find_all(hash_to_bytes(swhid), limit=limit): print( "swh:1:cnt:{cnt}, swh:1:rev:{rev}, {date}, {path}".format( cnt=hash_to_hex(row[0]), rev=hash_to_hex(row[1]), date=row[2], path=os.fsdecode(row[3]), ) ) diff --git a/swh/provenance/provenance.py b/swh/provenance/provenance.py index f48385f..a902483 100644 --- a/swh/provenance/provenance.py +++ b/swh/provenance/provenance.py @@ -1,390 +1,394 @@ from datetime import datetime import os import pytz from typing import Dict, Generator, List, Optional, Tuple from typing_extensions import Protocol, runtime_checkable from .archive import ArchiveInterface from .model import DirectoryEntry, FileEntry, TreeEntry from .origin import OriginEntry from .revision import RevisionEntry @runtime_checkable class ProvenanceInterface(Protocol): def commit(self): """Commit currently ongoing transactions in the backend DB""" ... def content_add_to_directory( self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes ) -> None: ... def content_add_to_revision( self, revision: RevisionEntry, blob: FileEntry, prefix: bytes ) -> None: ... def content_find_first( self, blobid: bytes ) -> Optional[Tuple[bytes, bytes, datetime, bytes]]: ... def content_find_all( self, blobid: bytes, limit: Optional[int] = None ) -> Generator[Tuple[bytes, bytes, datetime, bytes], None, None]: ... def content_get_early_date(self, blob: FileEntry) -> Optional[datetime]: ... def content_get_early_dates(self, blobs: List[FileEntry]) -> Dict[bytes, datetime]: ... def content_set_early_date(self, blob: FileEntry, date: datetime) -> None: ... def directory_add_to_revision( self, revision: RevisionEntry, directory: DirectoryEntry, path: bytes ) -> None: ... def directory_get_date_in_isochrone_frontier( self, directory: DirectoryEntry ) -> Optional[datetime]: ... def directory_get_dates_in_isochrone_frontier( self, dirs: List[DirectoryEntry] ) -> Dict[bytes, datetime]: ... def directory_invalidate_in_isochrone_frontier( self, directory: DirectoryEntry ) -> None: ... def directory_set_date_in_isochrone_frontier( self, directory: DirectoryEntry, date: datetime ) -> None: ... def origin_get_id(self, origin: OriginEntry) -> int: ... def revision_add(self, revision: RevisionEntry) -> None: ... def revision_add_before_revision( self, relative: RevisionEntry, revision: RevisionEntry ) -> None: ... def revision_add_to_origin( self, origin: OriginEntry, revision: RevisionEntry ) -> None: ... def revision_get_early_date(self, revision: RevisionEntry) -> Optional[datetime]: ... def revision_get_preferred_origin(self, revision: RevisionEntry) -> int: ... def revision_in_history(self, revision: RevisionEntry) -> bool: ... def revision_set_preferred_origin( self, origin: OriginEntry, revision: RevisionEntry ) -> None: ... def revision_visited(self, revision: RevisionEntry) -> bool: ... def directory_process_content( provenance: ProvenanceInterface, directory: DirectoryEntry, relative: DirectoryEntry ) -> None: stack = [(directory, b"")] while stack: current, prefix = stack.pop() for child in iter(current): if isinstance(child, FileEntry): # Add content to the relative directory with the computed prefix. provenance.content_add_to_directory(relative, child, prefix) else: # Recursively walk the child directory. stack.append((child, os.path.join(prefix, child.name))) def origin_add(provenance: ProvenanceInterface, origin: OriginEntry) -> None: # TODO: refactor to iterate over origin visit statuses and commit only once # per status. origin.id = provenance.origin_get_id(origin) for revision in origin.revisions: origin_add_revision(provenance, origin, revision) # Commit after each revision provenance.commit() # TODO: verify this! def origin_add_revision( provenance: ProvenanceInterface, origin: OriginEntry, revision: RevisionEntry ) -> None: stack: List[Tuple[Optional[RevisionEntry], RevisionEntry]] = [(None, revision)] while stack: relative, current = stack.pop() # Check if current revision has no preferred origin and update if necessary. preferred = provenance.revision_get_preferred_origin(current) if preferred is None: provenance.revision_set_preferred_origin(origin, current) ######################################################################## if relative is None: # This revision is pointed directly by the origin. visited = provenance.revision_visited(current) provenance.revision_add_to_origin(origin, current) if not visited: stack.append((current, current)) else: # This revision is a parent of another one in the history of the # relative revision. for parent in iter(current): visited = provenance.revision_visited(parent) if not visited: # The parent revision has never been seen before pointing # directly to an origin. known = provenance.revision_in_history(parent) if known: # The parent revision is already known in some other # revision's history. We should point it directly to # the origin and (eventually) walk its history. stack.append((None, parent)) else: # The parent revision was never seen before. We should # walk its history and associate it with the same # relative revision. provenance.revision_add_before_revision(relative, parent) stack.append((relative, parent)) else: # The parent revision already points to an origin, so its # history was properly processed before. We just need to # make sure it points to the current origin as well. provenance.revision_add_to_origin(origin, parent) def revision_add( - provenance: ProvenanceInterface, archive: ArchiveInterface, revision: RevisionEntry + provenance: ProvenanceInterface, + archive: ArchiveInterface, + revision: RevisionEntry, + lower: bool = True, + mindepth: int = 1, ) -> None: assert revision.date is not None assert revision.root is not None # Processed content starting from the revision's root directory. date = provenance.revision_get_early_date(revision) if date is None or revision.date < date: provenance.revision_add(revision) # TODO: add file size filtering revision_process_content( - provenance, revision, DirectoryEntry(archive, revision.root, b"") + provenance, + revision, + DirectoryEntry(archive, revision.root, b""), + lower=lower, + mindepth=mindepth, ) # TODO: improve this! Maybe using a max attempt counter? # Ideally Provenance class should guarantee that a commit never fails. while not provenance.commit(): continue class IsochroneNode: - def __init__(self, entry: TreeEntry, dates: Dict[bytes, datetime] = {}): + def __init__( + self, entry: TreeEntry, dates: Dict[bytes, datetime] = {}, depth: int = 0 + ): self.entry = entry + self.depth = depth self.date = dates.get(self.entry.id, None) self.children: List[IsochroneNode] = [] self.maxdate: Optional[datetime] = None def add_child( self, child: TreeEntry, dates: Dict[bytes, datetime] = {} ) -> "IsochroneNode": assert isinstance(self.entry, DirectoryEntry) and self.date is None - node = IsochroneNode(child, dates=dates) + node = IsochroneNode(child, dates=dates, depth=self.depth + 1) self.children.append(node) return node def build_isochrone_graph( provenance: ProvenanceInterface, revision: RevisionEntry, directory: DirectoryEntry ) -> IsochroneNode: assert revision.date is not None # Build the nodes structure root = IsochroneNode(directory) root.date = provenance.directory_get_date_in_isochrone_frontier(directory) stack = [root] while stack: current = stack.pop() assert isinstance(current.entry, DirectoryEntry) if current.date is None or current.date >= revision.date: # If current directory has an associated date in the isochrone frontier that # is greater or equal to the current revision's one, it should be ignored as # the revision is being processed out of order. if current.date is not None and current.date >= revision.date: provenance.directory_invalidate_in_isochrone_frontier(current.entry) current.date = None # Pre-query all known dates for content/directories in the current directory # for the provenance object to have them cached and (potentially) improve # performance. ddates = provenance.directory_get_dates_in_isochrone_frontier( [child for child in current.entry if isinstance(child, DirectoryEntry)] ) fdates = provenance.content_get_early_dates( [child for child in current.entry if isinstance(child, FileEntry)] ) for child in current.entry: # Recursively analyse directory nodes. if isinstance(child, DirectoryEntry): node = current.add_child(child, dates=ddates) stack.append(node) else: current.add_child(child, dates=fdates) # Precalculate max known date for each node in the graph. stack = [root] while stack: current = stack.pop() if current.date is None: if any(map(lambda child: child.maxdate is None, current.children)): # Current node needs to be analysed again after its children. stack.append(current) for child in current.children: if isinstance(child.entry, FileEntry): if child.date is not None: # File node that has been seen before, just use its known # date. child.maxdate = child.date else: # File node that has never been seen before, use current # revision date. child.maxdate = revision.date else: # Recursively analyse directory nodes. stack.append(child) else: maxdates = [] for child in current.children: assert child.maxdate is not None maxdates.append(child.maxdate) current.maxdate = ( max(maxdates) if maxdates else datetime.min.replace(tzinfo=pytz.UTC) ) else: # Directory node in the frontier, just use its known date. current.maxdate = current.date return root def revision_process_content( - provenance: ProvenanceInterface, revision: RevisionEntry, root: DirectoryEntry + provenance: ProvenanceInterface, + revision: RevisionEntry, + root: DirectoryEntry, + lower: bool = True, + mindepth: int = 1, ): assert revision.date is not None stack = [(build_isochrone_graph(provenance, revision, root), root.name)] while stack: current, path = stack.pop() assert isinstance(current.entry, DirectoryEntry) if current.date is not None: assert current.date < revision.date # Current directory is an outer isochrone frontier for a previously # processed revision. It should be reused as is. provenance.directory_add_to_revision(revision, current.entry, path) else: # Current directory is not an outer isochrone frontier for any previous # revision. It might be eligible for this one. - if is_new_frontier(current, revision): + if is_new_frontier(current, revision, lower=lower, mindepth=mindepth): assert current.maxdate is not None # Outer frontier should be moved to current position in the isochrone # graph. This is the first time this directory is found in the isochrone # frontier. provenance.directory_set_date_in_isochrone_frontier( current.entry, current.maxdate ) provenance.directory_add_to_revision(revision, current.entry, path) directory_process_content( provenance, directory=current.entry, relative=current.entry, ) else: # No point moving the frontier here. Either there are no files or they # are being seen for the first time here. Add all blobs to current # revision updating date if necessary, and recursively analyse # subdirectories as candidates to the outer frontier. for child in current.children: if isinstance(child.entry, FileEntry): blob = child.entry if child.date is None or revision.date < child.date: provenance.content_set_early_date(blob, revision.date) provenance.content_add_to_revision(revision, blob, path) else: stack.append((child, os.path.join(path, child.entry.name))) -def is_new_frontier(node: IsochroneNode, revision: RevisionEntry) -> bool: +def is_new_frontier( + node: IsochroneNode, revision: RevisionEntry, lower: bool = True, mindepth: int = 1 +) -> bool: assert node.maxdate is not None and revision.date is not None - # Using the following condition should we should get an algorithm equivalent to old - # version where frontiers are pushed up in the tree whenever possible. - # return node.maxdate < revision.date # all content in node is already known - - # Push frontiers up while forbidding them in the root directory of the revision. - # return ( - # node.maxdate < revision.date # all content in node is already known - # and node.entry.id != revision.root # it is not the root directory - # ) - - # Keep frontiers down in the directory tree with the aim of maximizing their - # reusage. - # return ( - # node.maxdate < revision.date # all content in node is already known - # and has_blobs(node) # there is at least one blob in it - # ) - - # Keep frontiers down and also forbid placing them in the root directory. + # The only real condition for a directory to be a frontier is that its maxdate is + # strictily less than current revision's date. Checking mindepth is meant to skip + # root directories (or any arbitrary depth) to improve the result. The option lower + # tries to maximize the reusage rate of previously defined frontiers by keeping them + # low in the directory tree. return ( node.maxdate < revision.date # all content in node is already known - and node.entry.id != revision.root # it is not the root directory - and has_blobs(node) # there is at least one blob in it + and node.depth >= mindepth # current node is deeper than the min allowed depth + and (has_blobs(node) if lower else True) # there is at least one blob in it ) def has_blobs(node: IsochroneNode) -> bool: # We may want to look for files in different ways to decide whether to define a # frontier or not: # 1. Only files in current node: return any(map(lambda child: isinstance(child.entry, FileEntry), node.children)) # 2. Files anywhere in the isochrone graph # stack = [node] # while stack: # current = stack.pop() # if any( # map(lambda child: isinstance(child.entry, FileEntry), current.children)): # return True # else: # # All children are directory entries. # stack.extend(current.children) # return False # 3. Files in the intermediate directories between current node and any previously # defined frontier: # TODO: complete this case! # return any( # map(lambda child: isinstance(child.entry, FileEntry), node.children) # ) or all( # map( # lambda child: ( # not (isinstance(child.entry, DirectoryEntry) and child.date is None) # ) # or has_blobs(child), # node.children, # ) # )