diff --git a/swh/provenance/cli.py b/swh/provenance/cli.py index 15f9396..b1abe28 100644 --- a/swh/provenance/cli.py +++ b/swh/provenance/cli.py @@ -1,663 +1,682 @@ # Copyright (C) 2021-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information # WARNING: do not import unnecessary things here to keep cli startup time under # control from datetime import datetime, timezone from functools import partial import os from typing import Any, Dict, Generator, Optional, Tuple import click from deprecated import deprecated import iso8601 import yaml try: from systemd.daemon import notify except ImportError: notify = None from swh.core import config from swh.core.cli import CONTEXT_SETTINGS from swh.core.cli import swh as swh_cli_group from swh.model.hashutil import hash_to_bytes, hash_to_hex from swh.model.model import Sha1Git # All generic config code should reside in swh.core.config CONFIG_ENVVAR = "SWH_CONFIG_FILENAME" DEFAULT_PATH = os.environ.get(CONFIG_ENVVAR, None) DEFAULT_CONFIG: Dict[str, Any] = { "provenance": { "archive": { # Storage API based Archive object # "cls": "api", # "storage": { # "cls": "remote", # "url": "http://uffizi.internal.softwareheritage.org:5002", # } # Direct access Archive object "cls": "direct", "db": { "host": "belvedere.internal.softwareheritage.org", "port": 5432, "dbname": "softwareheritage", "user": "guest", }, }, "storage": { # Local PostgreSQL Storage # "cls": "postgresql", # "db": { # "host": "localhost", # "user": "postgres", # "password": "postgres", # "dbname": "provenance", # }, # Remote RabbitMQ/PostgreSQL Storage "cls": "rabbitmq", "url": "amqp://localhost:5672/%2f", "storage_config": { "cls": "postgresql", "db": { "host": "localhost", "user": "postgres", "password": "postgres", "dbname": "provenance", }, }, "batch_size": 100, "prefetch_count": 100, }, } } CONFIG_FILE_HELP = f""" \b Configuration can be loaded from a yaml file given either as --config-file option or the {CONFIG_ENVVAR} environment variable. If no configuration file is specified, use the following default configuration:: \b {yaml.dump(DEFAULT_CONFIG)}""" PROVENANCE_HELP = f"""Software Heritage provenance index database tools {CONFIG_FILE_HELP} """ @swh_cli_group.group( name="provenance", context_settings=CONTEXT_SETTINGS, help=PROVENANCE_HELP ) @click.option( "-C", "--config-file", default=None, type=click.Path(exists=True, dir_okay=False, path_type=str), help="""YAML configuration file.""", ) @click.option( "-P", "--profile", default=None, type=click.Path(exists=False, dir_okay=False, path_type=str), help="""Enable profiling to specified file.""", ) @click.pass_context def cli(ctx: click.core.Context, config_file: Optional[str], profile: str) -> None: if ( config_file is None and DEFAULT_PATH is not None and config.config_exists(DEFAULT_PATH) ): config_file = DEFAULT_PATH if config_file is None: conf = DEFAULT_CONFIG else: # read_raw_config do not fail on ENOENT if not os.path.exists(config_file): raise FileNotFoundError(config_file) conf = yaml.safe_load(open(config_file, "rb")) ctx.ensure_object(dict) ctx.obj["config"] = conf if profile: import atexit import cProfile print("Profiling...") pr = cProfile.Profile() pr.enable() def exit() -> None: pr.disable() pr.dump_stats(profile) atexit.register(exit) @cli.group(name="origin") @click.pass_context def origin(ctx: click.core.Context): from . import get_archive, get_provenance archive = get_archive(**ctx.obj["config"]["provenance"]["archive"]) provenance = get_provenance(**ctx.obj["config"]["provenance"]["storage"]) ctx.obj["provenance"] = provenance ctx.obj["archive"] = archive @origin.command(name="from-csv") @click.argument("filename", type=click.Path(exists=True)) @click.option( "-l", "--limit", type=int, help="""Limit the amount of entries (origins) to read from the input file.""", ) @click.pass_context def origin_from_csv(ctx: click.core.Context, filename: str, limit: Optional[int]): from .origin import CSVOriginIterator, origin_add provenance = ctx.obj["provenance"] archive = ctx.obj["archive"] origins_provider = generate_origin_tuples(filename) origins = CSVOriginIterator(origins_provider, limit=limit) with provenance: for origin in origins: origin_add(provenance, archive, [origin]) @origin.command(name="from-journal") @click.pass_context def origin_from_journal(ctx: click.core.Context): from swh.journal.client import get_journal_client from .journal_client import process_journal_origins provenance = ctx.obj["provenance"] archive = ctx.obj["archive"] journal_cfg = ctx.obj["config"].get("journal_client", {}) worker_fn = partial( process_journal_origins, archive=archive, provenance=provenance, ) cls = journal_cfg.pop("cls", None) or "kafka" client = get_journal_client( cls, **{ **journal_cfg, "object_types": ["origin_visit_status"], }, ) if notify: notify("READY=1") try: with provenance: client.process(worker_fn) except KeyboardInterrupt: ctx.exit(0) else: print("Done.") finally: if notify: notify("STOPPING=1") client.close() @cli.group(name="revision") @click.pass_context def revision(ctx: click.core.Context): from . import get_archive, get_provenance archive = get_archive(**ctx.obj["config"]["provenance"]["archive"]) provenance = get_provenance(**ctx.obj["config"]["provenance"]["storage"]) ctx.obj["provenance"] = provenance ctx.obj["archive"] = archive @revision.command(name="from-csv") @click.argument("filename", type=click.Path(exists=True)) @click.option( "-a", "--track-all", default=True, type=bool, help="""Index all occurrences of files in the development history.""", ) @click.option( "-f", "--flatten", default=True, type=bool, help="""Create flat models for directories in the isochrone frontier.""", ) @click.option( "-l", "--limit", type=int, help="""Limit the amount of entries (revisions) to read from the input file.""", ) @click.option( "-m", "--min-depth", default=1, type=int, help="""Set minimum depth (in the directory tree) at which an isochrone """ """frontier can be defined.""", ) @click.option( "-r", "--reuse", default=True, type=bool, help="""Prioritize the usage of previously defined isochrone frontiers """ """whenever possible.""", ) @click.option( "-s", "--min-size", default=0, type=int, help="""Set the minimum size (in bytes) of files to be indexed. """ """Any smaller file will be ignored.""", ) +@click.option( + "-d", + "--max-directory-size", + default=0, + type=int, + help="""Set the maximum recursive directory size of revisions to be indexed.""", +) @click.pass_context def revision_from_csv( ctx: click.core.Context, filename: str, track_all: bool, flatten: bool, limit: Optional[int], min_depth: int, reuse: bool, min_size: int, + max_directory_size: int, ) -> None: from .revision import CSVRevisionIterator, revision_add provenance = ctx.obj["provenance"] archive = ctx.obj["archive"] revisions_provider = generate_revision_tuples(filename) revisions = CSVRevisionIterator(revisions_provider, limit=limit) with provenance: for revision in revisions: revision_add( provenance, archive, [revision], trackall=track_all, flatten=flatten, lower=reuse, mindepth=min_depth, minsize=min_size, + max_directory_size=max_directory_size, ) @revision.command(name="from-journal") @click.option( "-a", "--track-all", default=True, type=bool, help="""Index all occurrences of files in the development history.""", ) @click.option( "-f", "--flatten", default=True, type=bool, help="""Create flat models for directories in the isochrone frontier.""", ) @click.option( "-l", "--limit", type=int, help="""Limit the amount of entries (revisions) to read from the input file.""", ) @click.option( "-m", "--min-depth", default=1, type=int, help="""Set minimum depth (in the directory tree) at which an isochrone """ """frontier can be defined.""", ) @click.option( "-r", "--reuse", default=True, type=bool, help="""Prioritize the usage of previously defined isochrone frontiers """ """whenever possible.""", ) @click.option( "-s", "--min-size", default=0, type=int, help="""Set the minimum size (in bytes) of files to be indexed. """ """Any smaller file will be ignored.""", ) +@click.option( + "-d", + "--max-directory-size", + default=0, + type=int, + help="""Set the maximum recursive directory size of revisions to be indexed.""", +) @click.pass_context def revision_from_journal( ctx: click.core.Context, track_all: bool, flatten: bool, limit: Optional[int], min_depth: int, reuse: bool, min_size: int, + max_directory_size: int, ) -> None: from swh.journal.client import get_journal_client from .journal_client import process_journal_revisions provenance = ctx.obj["provenance"] archive = ctx.obj["archive"] journal_cfg = ctx.obj["config"].get("journal_client", {}) worker_fn = partial( process_journal_revisions, archive=archive, provenance=provenance, + minsize=min_size, + max_directory_size=max_directory_size, ) cls = journal_cfg.pop("cls", None) or "kafka" client = get_journal_client( cls, **{ **journal_cfg, "object_types": ["revision"], }, ) if notify: notify("READY=1") try: with provenance: client.process(worker_fn) except KeyboardInterrupt: ctx.exit(0) else: print("Done.") finally: if notify: notify("STOPPING=1") client.close() @cli.group(name="directory") @click.pass_context def directory(ctx: click.core.Context): from . import get_archive, get_provenance archive = get_archive(**ctx.obj["config"]["provenance"]["archive"]) provenance = get_provenance(**ctx.obj["config"]["provenance"]["storage"]) ctx.obj["provenance"] = provenance ctx.obj["archive"] = archive @directory.command(name="flatten") @click.option( "--range-from", type=str, help="start ID of the range of directories to flatten" ) @click.option( "--range-to", type=str, help="stop ID of the range of directories to flatten" ) @click.option( "-s", "--min-size", default=0, type=int, help="""Set the minimum size (in bytes) of files to be indexed. Any smaller file will be ignored.""", ) @click.pass_context def directory_flatten(ctx: click.core.Context, range_from, range_to, min_size): from swh.provenance.directory import directory_flatten_range provenance = ctx.obj["provenance"] archive = ctx.obj["archive"] directory_flatten_range( provenance, archive, hash_to_bytes(range_from), hash_to_bytes(range_to), min_size, ) # old (deprecated) commands @cli.command(name="iter-frontiers") @click.argument("filename") @click.option( "-l", "--limit", type=int, help="""Limit the amount of entries (directories) to read from the input file.""", ) @click.option( "-s", "--min-size", default=0, type=int, help="""Set the minimum size (in bytes) of files to be indexed. """ """Any smaller file will be ignored.""", ) @click.pass_context def iter_frontiers( ctx: click.core.Context, filename: str, limit: Optional[int], min_size: int, ) -> None: """Process a provided list of directories in the isochrone frontier.""" from . import get_archive, get_provenance from .directory import CSVDirectoryIterator, directory_add archive = get_archive(**ctx.obj["config"]["provenance"]["archive"]) directories_provider = generate_directory_ids(filename) directories = CSVDirectoryIterator(directories_provider, limit=limit) with get_provenance(**ctx.obj["config"]["provenance"]["storage"]) as provenance: for directory in directories: directory_add( provenance, archive, [directory], minsize=min_size, ) def generate_directory_ids( filename: str, ) -> Generator[Sha1Git, None, None]: for line in open(filename, "r"): if line.strip(): yield hash_to_bytes(line.strip()) @cli.command(name="iter-revisions") @click.argument("filename") @click.option( "-a", "--track-all", default=True, type=bool, help="""Index all occurrences of files in the development history.""", ) @click.option( "-f", "--flatten", default=True, type=bool, help="""Create flat models for directories in the isochrone frontier.""", ) @click.option( "-l", "--limit", type=int, help="""Limit the amount of entries (revisions) to read from the input file.""", ) @click.option( "-m", "--min-depth", default=1, type=int, help="""Set minimum depth (in the directory tree) at which an isochrone """ """frontier can be defined.""", ) @click.option( "-r", "--reuse", default=True, type=bool, help="""Prioritize the usage of previously defined isochrone frontiers """ """whenever possible.""", ) @click.option( "-s", "--min-size", default=0, type=int, help="""Set the minimum size (in bytes) of files to be indexed. """ """Any smaller file will be ignored.""", ) @click.pass_context def iter_revisions( ctx: click.core.Context, filename: str, track_all: bool, flatten: bool, limit: Optional[int], min_depth: int, reuse: bool, min_size: int, ) -> None: """Process a provided list of revisions.""" from . import get_archive, get_provenance from .revision import CSVRevisionIterator, revision_add archive = get_archive(**ctx.obj["config"]["provenance"]["archive"]) revisions_provider = generate_revision_tuples(filename) revisions = CSVRevisionIterator(revisions_provider, limit=limit) with get_provenance(**ctx.obj["config"]["provenance"]["storage"]) as provenance: for revision in revisions: revision_add( provenance, archive, [revision], trackall=track_all, flatten=flatten, lower=reuse, mindepth=min_depth, minsize=min_size, ) def generate_revision_tuples( filename: str, ) -> Generator[Tuple[Sha1Git, datetime, Sha1Git], None, None]: for line in open(filename, "r"): if line.strip(): revision, date, root = line.strip().split(",") yield ( hash_to_bytes(revision), iso8601.parse_date(date, default_timezone=timezone.utc), hash_to_bytes(root), ) @cli.command(name="iter-origins") @click.argument("filename") @click.option( "-l", "--limit", type=int, help="""Limit the amount of entries (origins) to read from the input file.""", ) @click.pass_context @deprecated(version="0.0.1", reason="Use `swh provenance origin from-csv` instead") def iter_origins(ctx: click.core.Context, filename: str, limit: Optional[int]) -> None: """Process a provided list of origins.""" from . import get_archive, get_provenance from .origin import CSVOriginIterator, origin_add archive = get_archive(**ctx.obj["config"]["provenance"]["archive"]) origins_provider = generate_origin_tuples(filename) origins = CSVOriginIterator(origins_provider, limit=limit) with get_provenance(**ctx.obj["config"]["provenance"]["storage"]) as provenance: for origin in origins: origin_add(provenance, archive, [origin]) def generate_origin_tuples(filename: str) -> Generator[Tuple[str, bytes], None, None]: for line in open(filename, "r"): if line.strip(): url, snapshot = line.strip().split(",") yield (url, hash_to_bytes(snapshot)) @cli.command(name="find-first") @click.argument("swhid") @click.pass_context def find_first(ctx: click.core.Context, swhid: str) -> None: """Find first occurrence of the requested blob.""" from . import get_provenance with get_provenance(**ctx.obj["config"]["provenance"]["storage"]) as provenance: occur = provenance.content_find_first(hash_to_bytes(swhid)) if occur is not None: print( f"swh:1:cnt:{hash_to_hex(occur.content)}, " f"swh:1:rev:{hash_to_hex(occur.revision)}, " f"{occur.date}, " f"{occur.origin}, " f"{os.fsdecode(occur.path)}" ) else: print(f"Cannot find a content with the id {swhid}") @cli.command(name="find-all") @click.argument("swhid") @click.option( "-l", "--limit", type=int, help="""Limit the amount results to be retrieved.""" ) @click.pass_context def find_all(ctx: click.core.Context, swhid: str, limit: Optional[int]) -> None: """Find all occurrences of the requested blob.""" from . import get_provenance with get_provenance(**ctx.obj["config"]["provenance"]["storage"]) as provenance: for occur in provenance.content_find_all(hash_to_bytes(swhid), limit=limit): print( f"swh:1:cnt:{hash_to_hex(occur.content)}, " f"swh:1:rev:{hash_to_hex(occur.revision)}, " f"{occur.date}, " f"{occur.origin}, " f"{os.fsdecode(occur.path)}" ) diff --git a/swh/provenance/graph.py b/swh/provenance/graph.py index 66704f8..aab6751 100644 --- a/swh/provenance/graph.py +++ b/swh/provenance/graph.py @@ -1,215 +1,226 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from __future__ import annotations from datetime import datetime, timezone import os from typing import Any, Dict, Optional, Set from swh.core.statsd import statsd from swh.model.model import Sha1Git from .archive import ArchiveInterface from .interface import ProvenanceInterface from .model import DirectoryEntry, RevisionEntry GRAPH_DURATION_METRIC = "swh_provenance_graph_duration_seconds" GRAPH_OPERATIONS_METRIC = "swh_provenance_graph_operations_total" UTCMIN = datetime.min.replace(tzinfo=timezone.utc) +class DirectoryTooLarge(ValueError): + pass + + class HistoryGraph: @statsd.timed(metric=GRAPH_DURATION_METRIC, tags={"method": "build_history_graph"}) def __init__( self, archive: ArchiveInterface, revision: RevisionEntry, ) -> None: self.head_id = revision.id self._nodes: Set[Sha1Git] = set() # rev -> set(parents) self._edges: Dict[Sha1Git, Set[Sha1Git]] = {} stack = {self.head_id} while stack: current = stack.pop() if current not in self._nodes: self._nodes.add(current) self._edges.setdefault(current, set()) for rev, parent in archive.revision_get_some_outbound_edges(current): self._nodes.add(rev) self._edges.setdefault(rev, set()).add(parent) stack.add(parent) # don't process nodes for which we've already retrieved outbound edges stack -= self._nodes def parent_ids(self) -> Set[Sha1Git]: """Get all the known parent ids in the current graph""" return self._nodes - {self.head_id} def __str__(self) -> str: return f" Dict[str, Any]: return { "head": self.head_id.hex(), "graph": { node.hex(): sorted(parent.hex() for parent in parents) for node, parents in self._edges.items() }, } class IsochroneNode: def __init__( self, entry: DirectoryEntry, dbdate: Optional[datetime] = None, depth: int = 0, prefix: bytes = b"", ) -> None: self.entry = entry self.depth = depth # dbdate is the maxdate for this node that comes from the DB self._dbdate: Optional[datetime] = dbdate # maxdate is set by the maxdate computation algorithm self.maxdate: Optional[datetime] = None self.invalid = False self.path = os.path.join(prefix, self.entry.name) if prefix else self.entry.name self.children: Set[IsochroneNode] = set() @property def dbdate(self) -> Optional[datetime]: # use a property to make this attribute (mostly) read-only return self._dbdate def invalidate(self) -> None: statsd.increment( metric=GRAPH_OPERATIONS_METRIC, tags={"method": "invalidate_frontier"} ) self._dbdate = None self.maxdate = None self.invalid = True def add_directory( self, child: DirectoryEntry, date: Optional[datetime] = None ) -> IsochroneNode: # we should not be processing this node (ie add subdirectories or files) if it's # actually known by the provenance DB assert self.dbdate is None node = IsochroneNode(child, dbdate=date, depth=self.depth + 1, prefix=self.path) self.children.add(node) return node def __str__(self) -> str: return ( f"<{self.entry}: depth={self.depth}, dbdate={self.dbdate}, " f"maxdate={self.maxdate}, invalid={self.invalid}, path={self.path!r}, " f"children=[{', '.join(str(child) for child in self.children)}]>" ) def __eq__(self, other: Any) -> bool: return isinstance(other, IsochroneNode) and self.__dict__ == other.__dict__ def __hash__(self) -> int: # only immutable attributes are considered to compute hash return hash((self.entry, self.depth, self.path)) @statsd.timed(metric=GRAPH_DURATION_METRIC, tags={"method": "build_isochrone_graph"}) def build_isochrone_graph( provenance: ProvenanceInterface, archive: ArchiveInterface, revision: RevisionEntry, directory: DirectoryEntry, minsize: int = 0, + max_directory_size: int = 0, ) -> IsochroneNode: assert revision.date is not None assert revision.root == directory.id # this function process a revision in 2 steps: # # 1. build the tree structure of IsochroneNode objects (one INode per # directory under the root directory of the revision but not following # known subdirectories), and gather the dates from the DB for already # known objects; for files, just keep all the dates in a global 'fdates' # dict; note that in this step, we will only recurse the directories # that are not already known. # # 2. compute the maxdate for each node of the tree that was not found in the DB. # Build the nodes structure root_date = provenance.directory_get_date_in_isochrone_frontier(directory) root = IsochroneNode(directory, dbdate=root_date) stack = [root] fdates: Dict[Sha1Git, datetime] = {} # map {file_id: date} + counter = 0 while stack: + counter += 1 + if max_directory_size and counter > max_directory_size: + raise DirectoryTooLarge( + f"Max directory size exceeded ({counter}): {directory.id.hex()}" + ) current = stack.pop() if current.dbdate is None or current.dbdate >= revision.date: # If current directory has an associated date in the isochrone frontier that # is greater or equal to the current revision's one, it should be ignored as # the revision is being processed out of order. if current.dbdate is not None and current.dbdate >= revision.date: current.invalidate() # Pre-query all known dates for directories in the current directory # for the provenance object to have them cached and (potentially) improve # performance. current.entry.retrieve_children(archive, minsize=minsize) ddates = provenance.directory_get_dates_in_isochrone_frontier( current.entry.dirs ) for dir in current.entry.dirs: # Recursively analyse subdirectory nodes node = current.add_directory(dir, date=ddates.get(dir.id, None)) stack.append(node) fdates.update(provenance.content_get_early_dates(current.entry.files)) # Precalculate max known date for each node in the graph (only directory nodes are # pushed to the stack). stack = [root] while stack: current = stack.pop() # Current directory node is known if it already has an assigned date (ie. it was # already seen as an isochrone frontier). if current.dbdate is not None: assert current.maxdate is None current.maxdate = current.dbdate else: if any(x.maxdate is None for x in current.children): # at least one child of current has no maxdate yet # Current node needs to be analysed again after its children. stack.append(current) for child in current.children: if child.maxdate is None: # if child.maxdate is None, it must be processed stack.append(child) else: # all the files and directories under current have a maxdate, # we can infer the maxdate for current directory assert current.maxdate is None # if all content is already known, update current directory info. current.maxdate = max( [UTCMIN] + [ child.maxdate for child in current.children if child.maxdate is not None # for mypy ] + [ fdates.get(file.id, revision.date) for file in current.entry.files ] ) return root diff --git a/swh/provenance/revision.py b/swh/provenance/revision.py index bc7a60d..10635b4 100644 --- a/swh/provenance/revision.py +++ b/swh/provenance/revision.py @@ -1,230 +1,240 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime, timezone import logging from typing import Generator, Iterable, Iterator, List, Optional, Tuple from swh.core.statsd import statsd from swh.model.model import Sha1Git from .archive import ArchiveInterface from .directory import directory_flatten -from .graph import IsochroneNode, build_isochrone_graph +from .graph import DirectoryTooLarge, IsochroneNode, build_isochrone_graph from .interface import ProvenanceInterface from .model import DirectoryEntry, RevisionEntry REVISION_DURATION_METRIC = "swh_provenance_revision_content_layer_duration_seconds" logger = logging.getLogger(__name__) EPOCH = datetime(1970, 1, 1, tzinfo=timezone.utc) class CSVRevisionIterator: """Iterator over revisions typically present in the given CSV file. The input is an iterator that produces 3 elements per row: (id, date, root) where: - id: is the id (sha1_git) of the revision - date: is the author date - root: sha1 of the directory """ def __init__( self, revisions: Iterable[Tuple[Sha1Git, datetime, Sha1Git]], limit: Optional[int] = None, ) -> None: self.revisions: Iterator[Tuple[Sha1Git, datetime, Sha1Git]] if limit is not None: from itertools import islice self.revisions = islice(revisions, limit) else: self.revisions = iter(revisions) def __iter__(self) -> Generator[RevisionEntry, None, None]: for id, date, root in self.revisions: if date.tzinfo is None: date = date.replace(tzinfo=timezone.utc) yield RevisionEntry(id, date=date, root=root) @statsd.timed(metric=REVISION_DURATION_METRIC, tags={"method": "main"}) def revision_add( provenance: ProvenanceInterface, archive: ArchiveInterface, revisions: List[RevisionEntry], trackall: bool = True, flatten: bool = True, lower: bool = True, mindepth: int = 1, minsize: int = 0, commit: bool = True, + max_directory_size: int = 0, ) -> None: revs_processed = 0 batch_size = len(revisions) revs_to_commit = False for batch_pos, revision in enumerate( sorted(revisions, key=lambda r: r.date or EPOCH) ): assert revision.date is not None assert revision.root is not None # Processed content starting from the revision's root directory. date = provenance.revision_get_date(revision) if date is None or revision.date < date: logger.debug( "Processing revision %s on %s (root %s)", revision.id.hex(), revision.date, revision.root.hex(), ) logger.debug("provenance date: %s, building isochrone graph", date) - graph = build_isochrone_graph( - provenance, - archive, - revision, - DirectoryEntry(revision.root), - minsize=minsize, - ) + try: + graph = build_isochrone_graph( + provenance, + archive, + revision, + DirectoryEntry(revision.root), + minsize=minsize, + max_directory_size=max_directory_size, + ) + except DirectoryTooLarge: + logger.warn( + "Ignoring revision %s: root directory %s too large", + revision.id.hex(), + revision.root.hex(), + ) + continue logger.debug("isochrone graph built, processing content") revision_process_content( provenance, archive, revision, graph, trackall=trackall, flatten=flatten, lower=lower, mindepth=mindepth, minsize=minsize, ) revs_processed += 1 revs_to_commit = True if revs_to_commit and commit: flushed = provenance.flush_if_necessary() if flushed: revs_to_commit = False logger.debug( "flushed (rev %s/%s, processed %s)", batch_pos + 1, batch_size, revs_processed, ) if revs_to_commit and commit: logger.debug("flushing batch") provenance.flush() @statsd.timed(metric=REVISION_DURATION_METRIC, tags={"method": "process_content"}) def revision_process_content( provenance: ProvenanceInterface, archive: ArchiveInterface, revision: RevisionEntry, graph: IsochroneNode, trackall: bool = True, flatten: bool = True, lower: bool = True, mindepth: int = 1, minsize: int = 0, ) -> None: assert revision.date is not None provenance.revision_add(revision) stack = [graph] while stack: current = stack.pop() if current.dbdate is not None: assert current.dbdate < revision.date if trackall: # Current directory is an outer isochrone frontier for a previously # processed revision. It should be reused as is. provenance.directory_add_to_revision( revision, current.entry, current.path ) else: assert current.maxdate is not None # Current directory is not an outer isochrone frontier for any previous # revision. It might be eligible for this one. if is_new_frontier( current, revision=revision, lower=lower, mindepth=mindepth, ): # Outer frontier should be moved to current position in the isochrone # graph. This is the first time this directory is found in the isochrone # frontier. provenance.directory_set_date_in_isochrone_frontier( current.entry, current.maxdate ) if trackall: provenance.directory_add_to_revision( revision, current.entry, current.path ) if flatten: directory_flatten( provenance, archive, current.entry, minsize=minsize ) else: # If current node is an invalidated frontier, update its date for future # revisions to get the proper value. if current.invalid: provenance.directory_set_date_in_isochrone_frontier( current.entry, revision.date ) # No point moving the frontier here. Either there are no files or they # are being seen for the first time here. Add all blobs to current # revision updating date if necessary, and recursively analyse # subdirectories as candidates to the outer frontier. for blob in current.entry.files: date = provenance.content_get_early_date(blob) if date is None or revision.date < date: provenance.content_set_early_date(blob, revision.date) provenance.content_add_to_revision(revision, blob, current.path) for child in current.children: stack.append(child) def is_new_frontier( node: IsochroneNode, revision: RevisionEntry, lower: bool = True, mindepth: int = 1, ) -> bool: assert node.maxdate is not None # for mypy assert revision.date is not None # idem # We want to ensure that all first occurrences end up in the content_early_in_rev # relation. Thus, we force for every blob outside a frontier to have an strictly # earlier date. return ( node.maxdate < revision.date # all content is earlier than revision and node.depth >= mindepth # deeper than the min allowed depth and (has_blobs(node) if lower else True) # there is at least one blob ) def has_blobs(node: IsochroneNode) -> bool: # We may want to look for files in different ways to decide whether to define a # frontier or not: # 1. Only files in current node: return any(node.entry.files) # 2. Files anywhere in the isochrone graph # stack = [node] # while stack: # current = stack.pop() # if any(current.entry.files): # return True # else: # # All children are directory entries. # stack.extend(current.children) # return False # 3. Files in the intermediate directories between current node and any previously # defined frontier: # TODO: complete this case! diff --git a/swh/provenance/tests/data/README.md b/swh/provenance/tests/data/README.md index 15e32da..09cf190 100644 --- a/swh/provenance/tests/data/README.md +++ b/swh/provenance/tests/data/README.md @@ -1,188 +1,190 @@ # Provenance Index Test Dataset This directory contains datasets used by `test_provenance_heurstics` tests of the provenance index database. ## Datasets -There are currently 3 dataset: +There are currently 4 datasets: - cmdbts2: original dataset - out-of-order: with unsorted revisions - with-merge: with merge revisions +- git-bomb: a clone of https://github.com/Katee/git-bomb dedicated to testing a + repo with very large expanded directory structure. Each dataset `xxx` consist in several parts: - a description of a git repository as a yaml file named `xxx_repo.yaml`, - a msgpack file containing storage objects for the given repository, from which the storage is filled before each test using these data, and - a set of synthetic files, named `synthetic_xxx_(lower|upper)_.txt`, describing the expected result in the provenance database if ingested with the flag `lower` set or not set, and the `mindepth` value (integer, most often `1` or `2`), - a swh-graph compressed dataset (in the `swhgraph/` directory), used for testing the ArchiveGraph backend. ### Generate datasets files For each dataset `xxx`, execute a number of commands: ``` for dataset in cmdbts2 out-of-order with-merges; do python generate_repo.py -C ${dataset}_repo.yaml $dataset > synthetic_${dataset}_template.txt # you may want to edit/update synthetic files from this template, see below python generate_storage_from_git.py $dataset python generate_graph_dataset.py --compress $dataset done ``` ## Git repos description file The description of a git repository is a yaml file which contains a list dicts, each one representing a git revision to add (linearly) in the git repo used a base for the dataset. Each dict consist in a structure like: ``` yaml - msg: R00 date: 1000000000 content: A/B/C/a: "content a" ``` this example will generate a git commit with the commit message "R00", the author and committer date 1000000000 (given as a unix timestamp), and a one file which path is `A/B/C/a` and content is "content a". The file is parsed to create git revisions in a temporary git repository, in order of appearance in the yaml file (so one may create an git repository with 'out-of-order' commits). There is no way of creating branches and merges for now. The tool to generate this git repo is `generate_repo.py`: ``` python generate_repo.py --help Usage: generate_repo.py [OPTIONS] INPUT_FILE OUTPUT_DIR Options: -C, --clean-output / --no-clean-output --help Show this message and exit. ``` It generates a git repository in the `OUTPUT_DIR` as well as produces a template `synthetic` file on its standard output, which can be used to ease writing the expected `synthetic` files. Typical usage will be: ``` python generate_repo.py repo2_repo.yaml repo2 > synthetic_repo2_template.txt ``` Note that hashes (for revision, directories and content) of the git objects only depends on the content of the input yaml file. Calling the tool twice on the same input file should generate the exact same git repo twice. Also note that the tool will add a branch at each revision (using the commit message as bramch name), to make it easier to reference any point in the git history. ## Msgpack dump of the storage This file contains a set of storage objects (`Revision`, `Content` and `Directory`) and is usually generated from a local git repository (typically the one generated by the previous command) using the `generate_storage_from_git.py` tool: ``` python generate_storage_from_git.py --help Usage: generate_storage_from_git.py [OPTIONS] GIT_REPO simple tool to generate the CMDBTS.msgpack dataset filed used in tests Options: -r, --head TEXT head revision to start from -o, --output TEXT output file --help Show this message and exit. ``` Typical usage would be, using the git repository `repo2` created previously: ``` python generate_storage_from_git.py repo2 Revision hash for master is 8363e8e98751dc9f264d2fedd6b829ad4b1218b0 Wrote 86 objects in repo2.msgpack ``` ### Adding extra visits/snapshots It is also possible to generate a storage from a git repo with extra origin visits, using the `--visit` option of the `generate_storage_from_git` tool. This option expect a yaml file as argument. This file contains a description of extra visits (and snapshots) you want to add to the storage. The format is simple, for example: ``` # a visit pattern scenario for the 'repo_with_merges' repo - origin: http://repo_with_merges/1/ date: 1000000015 branches: - R01 ``` will create an OriginVisit (at given date) for the given origin URL (the Origin will be created as well), with a `Snapshot` including the listed branches. ## Synthetic files These files describe the expected content of the provenance database for each revision (in order of ingestion). The `generate_repo.py` tool will produce a template of synthetic file like: ``` 1000000000.0 b582a17b3fc37f72fc57877616f85c3f0abed064 R00 R00 | | | R b582a17b3fc37f72fc57877616f85c3f0abed064 | 1000000000.0 | | . | D a4cb5e6b2831f7e8eef0e6e08e43d642c97303a1 | 0.0 | | A | D 1c8d9fd9afa7e5a2cf52a3db6f05dc5c3a1ca86b | 0.0 | | A/B | D 36876d475197b5ad86ad592e8e28818171455f16 | 0.0 | | A/B/C | D 98f7a4a23d8df1fb1a5055facae2aff9b2d0a8b3 | 0.0 | | A/B/C/a | C 20329687bb9c1231a7e05afe86160343ad49b494 | 0.0 1000000010.0 8259eeae2ff5046f0bb4393d6e894fe6d7e01bfe R01 R01 | | | R 8259eeae2ff5046f0bb4393d6e894fe6d7e01bfe | 1000000010.0 | | . | D b3cf11b22c9f93c3c494cf90ab072f394155072d | 0.0 | | A | D baca735bf8b8720131b4bfdb47c51631a9260348 | 0.0 | | A/B | D 4b28979d88ed209a09c272bcc80f69d9b18339c2 | 0.0 | | A/B/C | D c9cabe7f49012e3fdef6ac6b929efb5654f583cf | 0.0 | | A/B/C/a | C 20329687bb9c1231a7e05afe86160343ad49b494 | 0.0 | | A/B/C/b | C 50e9cdb03f9719261dd39d7f2920b906db3711a3 | 0.0 [...] ``` where all the content and directories of each revision are listed; it's then the responsibility of the user to create the expected synthetic file for a given heuristics configuration. For example, the 2 revisions above are to be adapted, for the `(lower=True, mindepth=1)` case, as: ``` 1000000000 c0d8929936631ecbcf9147be6b8aa13b13b014e4 R00 R00 | | | R c0d8929936631ecbcf9147be6b8aa13b13b014e4 | 1000000000 | R---C | A/B/C/a | C 20329687bb9c1231a7e05afe86160343ad49b494 | 0 1000000010 1444db96cbd8cd791abe83527becee73d3c64e86 R01 R01 | | | R 1444db96cbd8cd791abe83527becee73d3c64e86 | 1000000010 | R---C | A/B/C/a | C 20329687bb9c1231a7e05afe86160343ad49b494 | -10 | R---C | A/B/C/b | C 50e9cdb03f9719261dd39d7f2920b906db3711a3 | 0 ``` diff --git a/swh/provenance/tests/data/git-bomb.msgpack b/swh/provenance/tests/data/git-bomb.msgpack new file mode 100644 index 0000000..2f339da Binary files /dev/null and b/swh/provenance/tests/data/git-bomb.msgpack differ diff --git a/swh/provenance/tests/test_isochrone_graph.py b/swh/provenance/tests/test_isochrone_graph.py index 11d5881..1ea29fb 100644 --- a/swh/provenance/tests/test_isochrone_graph.py +++ b/swh/provenance/tests/test_isochrone_graph.py @@ -1,113 +1,148 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from copy import deepcopy from datetime import datetime, timezone from typing import Any, Dict import pytest import yaml from swh.model.hashutil import hash_to_bytes from swh.provenance.archive import ArchiveInterface -from swh.provenance.graph import IsochroneNode, build_isochrone_graph +from swh.provenance.graph import DirectoryTooLarge, IsochroneNode, build_isochrone_graph from swh.provenance.interface import ProvenanceInterface from swh.provenance.model import DirectoryEntry, RevisionEntry from swh.provenance.revision import revision_add from swh.provenance.tests.conftest import ( fill_storage, get_datafile, load_repo_data, ts2dt, ) def isochrone_graph_from_dict(d: Dict[str, Any], depth: int = 0) -> IsochroneNode: """Takes a dictionary representing a tree of IsochroneNode objects, and recursively builds the corresponding graph.""" d = deepcopy(d) d["entry"]["id"] = hash_to_bytes(d["entry"]["id"]) d["entry"]["name"] = bytes(d["entry"]["name"], encoding="utf-8") dbdate = d.get("dbdate", None) if dbdate is not None: dbdate = datetime.fromtimestamp(d["dbdate"], timezone.utc) children = d.get("children", []) node = IsochroneNode( entry=DirectoryEntry(**d["entry"]), dbdate=dbdate, depth=depth, ) node.maxdate = datetime.fromtimestamp(d["maxdate"], timezone.utc) node.invalid = d.get("invalid", False) node.path = bytes(d["path"], encoding="utf-8") node.children = set( isochrone_graph_from_dict(child, depth=depth + 1) for child in children ) return node @pytest.mark.parametrize( "repo, lower, mindepth", ( ("cmdbts2", True, 1), ("cmdbts2", False, 1), ("cmdbts2", True, 2), ("cmdbts2", False, 2), ("out-of-order", True, 1), ), ) @pytest.mark.parametrize("batch", (True, False)) def test_isochrone_graph( provenance: ProvenanceInterface, archive: ArchiveInterface, repo: str, lower: bool, mindepth: int, batch: bool, ) -> None: # read data/README.md for more details on how these datasets are generated data = load_repo_data(repo) fill_storage(archive.storage, data) revisions = {rev["id"]: rev for rev in data["revision"]} filename = f"graphs_{repo}_{'lower' if lower else 'upper'}_{mindepth}.yaml" with open(get_datafile(filename)) as file: for expected in yaml.full_load(file): print("# Processing revision", expected["rev"]) revision = revisions[hash_to_bytes(expected["rev"])] entry = RevisionEntry( id=revision["id"], date=ts2dt(revision["date"]), root=revision["directory"], ) expected_graph = isochrone_graph_from_dict(expected["graph"]) print("Expected graph:", expected_graph) # Create graph for current revision and check it has the expected structure. assert entry.root is not None computed_graph = build_isochrone_graph( provenance, archive, entry, DirectoryEntry(entry.root), ) print("Computed graph:", computed_graph) assert computed_graph == expected_graph # Add current revision so that provenance info is kept up to date for the # following ones. revision_add( provenance, archive, [entry], lower=lower, mindepth=mindepth, commit=not batch, ) + + +def test_isochrone_graph_max_dir_size( + provenance: ProvenanceInterface, + archive: ArchiveInterface, +): + data = load_repo_data("git-bomb") + fill_storage(archive.storage, data) + + rev = archive.storage.revision_get( + [hash_to_bytes("7af99c9e7d4768fa681f4fe4ff61259794cf719b")] + )[0] + assert rev is not None + assert rev.date is not None + + with pytest.raises(DirectoryTooLarge, match="Max directory size exceeded"): + build_isochrone_graph( + provenance, + archive, + RevisionEntry(id=rev.id, date=rev.date.to_datetime(), root=rev.directory), + DirectoryEntry(rev.directory), + max_directory_size=1000, + ) + pass + + # from this directory, there should be only ~1k recursive entries, so the + # call to build_isochrone_graph with max_directory_size=1200 should succeed + dir_id = hash_to_bytes("3e50041e82b225ca9e9b2641548b0c1b81eb971b") + build_isochrone_graph( + provenance, + archive, + RevisionEntry(id=rev.id, date=rev.date.to_datetime(), root=dir_id), + DirectoryEntry(dir_id), + max_directory_size=1200, + )