diff --git a/swh/provenance/algos/__init__.py b/swh/provenance/algos/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/swh/provenance/directory.py b/swh/provenance/algos/directory.py similarity index 95% rename from swh/provenance/directory.py rename to swh/provenance/algos/directory.py index 4f3ebef..06db86f 100644 --- a/swh/provenance/directory.py +++ b/swh/provenance/algos/directory.py @@ -1,108 +1,107 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os from typing import Generator, Iterable, Iterator, List, Optional from swh.core.statsd import statsd from swh.model.model import Sha1Git - -from .archive import ArchiveInterface -from .interface import ProvenanceInterface -from .model import DirectoryEntry +from swh.provenance.archive import ArchiveInterface +from swh.provenance.interface import ProvenanceInterface +from swh.provenance.model import DirectoryEntry REVISION_DURATION_METRIC = "swh_provenance_directory_duration_seconds" class CSVDirectoryIterator: """Iterator over directories typically present in the given CSV file. The input is an iterator that produces ids (sha1_git) of directories """ def __init__( self, directories: Iterable[Sha1Git], limit: Optional[int] = None, ) -> None: self.directories: Iterator[Sha1Git] if limit is not None: from itertools import islice self.directories = islice(directories, limit) else: self.directories = iter(directories) def __iter__(self) -> Generator[DirectoryEntry, None, None]: for id in self.directories: yield DirectoryEntry(id) def directory_flatten_range( provenance: ProvenanceInterface, archive: ArchiveInterface, start_id: Sha1Git, end_id: Sha1Git, minsize: int = 0, commit: bool = True, ) -> None: """Flatten the known directories from ``start_id`` to ``end_id``.""" current = start_id while current < end_id: dirs = provenance.storage.directory_iter_not_flattenned( limit=100, start_id=current ) if not dirs: break directory_add( provenance, archive, [DirectoryEntry(id=d) for d in dirs], minsize, commit ) current = dirs[-1] @statsd.timed(metric=REVISION_DURATION_METRIC, tags={"method": "add"}) def directory_add( provenance: ProvenanceInterface, archive: ArchiveInterface, directories: List[DirectoryEntry], minsize: int = 0, commit: bool = True, ) -> None: for directory in directories: # Only flatten directories that are present in the provenance model, but not # flattenned yet. flattenned = provenance.directory_already_flattenned(directory) if flattenned is not None and not flattenned: directory_flatten( provenance, archive, directory, minsize=minsize, ) if commit: provenance.flush() @statsd.timed(metric=REVISION_DURATION_METRIC, tags={"method": "flatten"}) def directory_flatten( provenance: ProvenanceInterface, archive: ArchiveInterface, directory: DirectoryEntry, minsize: int = 0, ) -> None: """Recursively retrieve all the files of 'directory' and insert them in the 'provenance' database in the 'content_to_directory' table. """ stack = [(directory, b"")] while stack: current, prefix = stack.pop() current.retrieve_children(archive, minsize=minsize) for f_child in current.files: # Add content to the directory with the computed prefix. provenance.content_add_to_directory(directory, f_child, prefix) for d_child in current.dirs: # Recursively walk the child directory. stack.append((d_child, os.path.join(prefix, d_child.name))) provenance.directory_flag_as_flattenned(directory) diff --git a/swh/provenance/graph.py b/swh/provenance/algos/isochrone_graph.py similarity index 80% rename from swh/provenance/graph.py rename to swh/provenance/algos/isochrone_graph.py index aab6751..f2b0d46 100644 --- a/swh/provenance/graph.py +++ b/swh/provenance/algos/isochrone_graph.py @@ -1,226 +1,181 @@ -# Copyright (C) 2021 The Software Heritage developers +# Copyright (C) 2021-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from __future__ import annotations from datetime import datetime, timezone import os from typing import Any, Dict, Optional, Set from swh.core.statsd import statsd from swh.model.model import Sha1Git - -from .archive import ArchiveInterface -from .interface import ProvenanceInterface -from .model import DirectoryEntry, RevisionEntry +from swh.provenance.archive import ArchiveInterface +from swh.provenance.interface import ProvenanceInterface +from swh.provenance.model import DirectoryEntry, RevisionEntry GRAPH_DURATION_METRIC = "swh_provenance_graph_duration_seconds" GRAPH_OPERATIONS_METRIC = "swh_provenance_graph_operations_total" UTCMIN = datetime.min.replace(tzinfo=timezone.utc) class DirectoryTooLarge(ValueError): pass -class HistoryGraph: - @statsd.timed(metric=GRAPH_DURATION_METRIC, tags={"method": "build_history_graph"}) - def __init__( - self, - archive: ArchiveInterface, - revision: RevisionEntry, - ) -> None: - self.head_id = revision.id - self._nodes: Set[Sha1Git] = set() - # rev -> set(parents) - self._edges: Dict[Sha1Git, Set[Sha1Git]] = {} - - stack = {self.head_id} - while stack: - current = stack.pop() - - if current not in self._nodes: - self._nodes.add(current) - self._edges.setdefault(current, set()) - for rev, parent in archive.revision_get_some_outbound_edges(current): - self._nodes.add(rev) - self._edges.setdefault(rev, set()).add(parent) - stack.add(parent) - - # don't process nodes for which we've already retrieved outbound edges - stack -= self._nodes - - def parent_ids(self) -> Set[Sha1Git]: - """Get all the known parent ids in the current graph""" - return self._nodes - {self.head_id} - - def __str__(self) -> str: - return f" Dict[str, Any]: - return { - "head": self.head_id.hex(), - "graph": { - node.hex(): sorted(parent.hex() for parent in parents) - for node, parents in self._edges.items() - }, - } - - class IsochroneNode: def __init__( self, entry: DirectoryEntry, dbdate: Optional[datetime] = None, depth: int = 0, prefix: bytes = b"", ) -> None: self.entry = entry self.depth = depth # dbdate is the maxdate for this node that comes from the DB self._dbdate: Optional[datetime] = dbdate # maxdate is set by the maxdate computation algorithm self.maxdate: Optional[datetime] = None self.invalid = False self.path = os.path.join(prefix, self.entry.name) if prefix else self.entry.name self.children: Set[IsochroneNode] = set() @property def dbdate(self) -> Optional[datetime]: # use a property to make this attribute (mostly) read-only return self._dbdate def invalidate(self) -> None: statsd.increment( metric=GRAPH_OPERATIONS_METRIC, tags={"method": "invalidate_frontier"} ) self._dbdate = None self.maxdate = None self.invalid = True def add_directory( self, child: DirectoryEntry, date: Optional[datetime] = None ) -> IsochroneNode: # we should not be processing this node (ie add subdirectories or files) if it's # actually known by the provenance DB assert self.dbdate is None node = IsochroneNode(child, dbdate=date, depth=self.depth + 1, prefix=self.path) self.children.add(node) return node def __str__(self) -> str: return ( f"<{self.entry}: depth={self.depth}, dbdate={self.dbdate}, " f"maxdate={self.maxdate}, invalid={self.invalid}, path={self.path!r}, " f"children=[{', '.join(str(child) for child in self.children)}]>" ) def __eq__(self, other: Any) -> bool: return isinstance(other, IsochroneNode) and self.__dict__ == other.__dict__ def __hash__(self) -> int: # only immutable attributes are considered to compute hash return hash((self.entry, self.depth, self.path)) @statsd.timed(metric=GRAPH_DURATION_METRIC, tags={"method": "build_isochrone_graph"}) def build_isochrone_graph( provenance: ProvenanceInterface, archive: ArchiveInterface, revision: RevisionEntry, directory: DirectoryEntry, minsize: int = 0, max_directory_size: int = 0, ) -> IsochroneNode: assert revision.date is not None assert revision.root == directory.id # this function process a revision in 2 steps: # # 1. build the tree structure of IsochroneNode objects (one INode per # directory under the root directory of the revision but not following # known subdirectories), and gather the dates from the DB for already # known objects; for files, just keep all the dates in a global 'fdates' # dict; note that in this step, we will only recurse the directories # that are not already known. # # 2. compute the maxdate for each node of the tree that was not found in the DB. # Build the nodes structure root_date = provenance.directory_get_date_in_isochrone_frontier(directory) root = IsochroneNode(directory, dbdate=root_date) stack = [root] fdates: Dict[Sha1Git, datetime] = {} # map {file_id: date} counter = 0 while stack: counter += 1 if max_directory_size and counter > max_directory_size: raise DirectoryTooLarge( f"Max directory size exceeded ({counter}): {directory.id.hex()}" ) current = stack.pop() if current.dbdate is None or current.dbdate >= revision.date: # If current directory has an associated date in the isochrone frontier that # is greater or equal to the current revision's one, it should be ignored as # the revision is being processed out of order. if current.dbdate is not None and current.dbdate >= revision.date: current.invalidate() # Pre-query all known dates for directories in the current directory # for the provenance object to have them cached and (potentially) improve # performance. current.entry.retrieve_children(archive, minsize=minsize) ddates = provenance.directory_get_dates_in_isochrone_frontier( current.entry.dirs ) for dir in current.entry.dirs: # Recursively analyse subdirectory nodes node = current.add_directory(dir, date=ddates.get(dir.id, None)) stack.append(node) fdates.update(provenance.content_get_early_dates(current.entry.files)) # Precalculate max known date for each node in the graph (only directory nodes are # pushed to the stack). stack = [root] while stack: current = stack.pop() # Current directory node is known if it already has an assigned date (ie. it was # already seen as an isochrone frontier). if current.dbdate is not None: assert current.maxdate is None current.maxdate = current.dbdate else: if any(x.maxdate is None for x in current.children): # at least one child of current has no maxdate yet # Current node needs to be analysed again after its children. stack.append(current) for child in current.children: if child.maxdate is None: # if child.maxdate is None, it must be processed stack.append(child) else: # all the files and directories under current have a maxdate, # we can infer the maxdate for current directory assert current.maxdate is None # if all content is already known, update current directory info. current.maxdate = max( [UTCMIN] + [ child.maxdate for child in current.children if child.maxdate is not None # for mypy ] + [ fdates.get(file.id, revision.date) for file in current.entry.files ] ) return root diff --git a/swh/provenance/origin.py b/swh/provenance/algos/origin.py similarity index 70% rename from swh/provenance/origin.py rename to swh/provenance/algos/origin.py index 7a8caf4..13c3230 100644 --- a/swh/provenance/origin.py +++ b/swh/provenance/algos/origin.py @@ -1,143 +1,185 @@ # Copyright (C) 2021-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime from itertools import islice import logging -from typing import Generator, Iterable, Iterator, List, Optional, Tuple +from typing import Any, Dict, Generator, Iterable, Iterator, List, Optional, Set, Tuple from swh.core.statsd import statsd from swh.model.model import Sha1Git +from swh.provenance.archive import ArchiveInterface +from swh.provenance.interface import ProvenanceInterface +from swh.provenance.model import OriginEntry, RevisionEntry -from .archive import ArchiveInterface -from .graph import HistoryGraph -from .interface import ProvenanceInterface -from .model import OriginEntry - -ORIGIN_DURATION_METRIC = "swh_provenance_origin_revision_layer_duration_seconds" +ORIGIN_DURATION_METRIC = "swh_provenance_origin_duration_seconds" LOG_FORMAT = ( "%(levelname) -10s %(asctime)s %(name) -30s %(funcName) " "-35s %(lineno) -5d: %(message)s" ) LOGGER = logging.getLogger(__name__) +class HistoryGraph: + @statsd.timed(metric=ORIGIN_DURATION_METRIC, tags={"method": "HistoryGraph"}) + def __init__( + self, + archive: ArchiveInterface, + revision: RevisionEntry, + ) -> None: + self.head_id = revision.id + self._nodes: Set[Sha1Git] = set() + # rev -> set(parents) + self._edges: Dict[Sha1Git, Set[Sha1Git]] = {} + + stack = {self.head_id} + while stack: + current = stack.pop() + + if current not in self._nodes: + self._nodes.add(current) + self._edges.setdefault(current, set()) + for rev, parent in archive.revision_get_some_outbound_edges(current): + self._nodes.add(rev) + self._edges.setdefault(rev, set()).add(parent) + stack.add(parent) + + # don't process nodes for which we've already retrieved outbound edges + stack -= self._nodes + + def parent_ids(self) -> Set[Sha1Git]: + """Get all the known parent ids in the current graph""" + return self._nodes - {self.head_id} + + def __str__(self) -> str: + return f" Dict[str, Any]: + return { + "head": self.head_id.hex(), + "graph": { + node.hex(): sorted(parent.hex() for parent in parents) + for node, parents in self._edges.items() + }, + } + + class CSVOriginIterator: """Iterator over origin visit statuses typically present in the given CSV file. The input is an iterator that produces 2 elements per row: (url, snap) where: - url: is the origin url of the visit - snap: sha1_git of the snapshot pointed by the visit status """ def __init__( self, statuses: Iterable[Tuple[str, Sha1Git]], limit: Optional[int] = None, ) -> None: self.statuses: Iterator[Tuple[str, Sha1Git]] if limit is not None: self.statuses = islice(statuses, limit) else: self.statuses = iter(statuses) def __iter__(self) -> Generator[OriginEntry, None, None]: return (OriginEntry(url, snapshot) for url, snapshot in self.statuses) @statsd.timed(metric=ORIGIN_DURATION_METRIC, tags={"method": "main"}) def origin_add( provenance: ProvenanceInterface, archive: ArchiveInterface, origins: List[OriginEntry], commit: bool = True, ) -> None: for origin in origins: process_origin(provenance, archive, origin) if commit: start = datetime.now() LOGGER.debug("Flushing cache") provenance.flush() LOGGER.info("Cache flushed in %s", (datetime.now() - start)) @statsd.timed(metric=ORIGIN_DURATION_METRIC, tags={"method": "process_origin"}) def process_origin( provenance: ProvenanceInterface, archive: ArchiveInterface, origin: OriginEntry ) -> None: LOGGER.info("Processing origin=%s", origin) start = datetime.now() LOGGER.debug("Add origin") provenance.origin_add(origin) LOGGER.debug("Retrieving head revisions") origin.retrieve_revisions(archive) LOGGER.info("%d heads founds", origin.revision_count) for idx, revision in enumerate(origin.revisions): LOGGER.info( "checking revision %s (%d/%d)", revision, idx + 1, origin.revision_count ) if not provenance.revision_is_head(revision): LOGGER.debug("revision %s not in heads", revision) graph = HistoryGraph(archive, revision) LOGGER.debug("History graph built") origin_add_revision(provenance, origin, graph) LOGGER.debug("Revision added") # head is treated separately LOGGER.debug("Checking preferred origin") check_preferred_origin(provenance, origin, revision.id) LOGGER.debug("Adding revision to origin") provenance.revision_add_to_origin(origin, revision) cache_flush_start = datetime.now() if provenance.flush_if_necessary(): LOGGER.info( "Intermediate cache flush in %s", (datetime.now() - cache_flush_start) ) end = datetime.now() LOGGER.info("Processed origin %s in %s", origin.url, (end - start)) @statsd.timed(metric=ORIGIN_DURATION_METRIC, tags={"method": "process_revision"}) def origin_add_revision( provenance: ProvenanceInterface, origin: OriginEntry, graph: HistoryGraph, ) -> None: for parent_id in graph.parent_ids(): check_preferred_origin(provenance, origin, parent_id) # create a link between it and the head, and recursively walk its history provenance.revision_add_before_revision( head_id=graph.head_id, revision_id=parent_id ) @statsd.timed(metric=ORIGIN_DURATION_METRIC, tags={"method": "check_preferred_origin"}) def check_preferred_origin( provenance: ProvenanceInterface, origin: OriginEntry, revision_id: Sha1Git, ) -> None: # if the revision has no preferred origin just set the given origin as the # preferred one. TODO: this should be improved in the future! preferred = provenance.revision_get_preferred_origin(revision_id) if preferred is None: provenance.revision_set_preferred_origin(origin, revision_id) diff --git a/swh/provenance/revision.py b/swh/provenance/algos/revision.py similarity index 95% rename from swh/provenance/revision.py rename to swh/provenance/algos/revision.py index 10635b4..7d86970 100644 --- a/swh/provenance/revision.py +++ b/swh/provenance/algos/revision.py @@ -1,240 +1,240 @@ -# Copyright (C) 2021 The Software Heritage developers +# Copyright (C) 2021-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime, timezone import logging from typing import Generator, Iterable, Iterator, List, Optional, Tuple from swh.core.statsd import statsd from swh.model.model import Sha1Git +from swh.provenance.archive import ArchiveInterface +from swh.provenance.interface import ProvenanceInterface +from swh.provenance.model import DirectoryEntry, RevisionEntry -from .archive import ArchiveInterface from .directory import directory_flatten -from .graph import DirectoryTooLarge, IsochroneNode, build_isochrone_graph -from .interface import ProvenanceInterface -from .model import DirectoryEntry, RevisionEntry +from .isochrone_graph import DirectoryTooLarge, IsochroneNode, build_isochrone_graph -REVISION_DURATION_METRIC = "swh_provenance_revision_content_layer_duration_seconds" +REVISION_DURATION_METRIC = "swh_provenance_revision_duration_seconds" logger = logging.getLogger(__name__) EPOCH = datetime(1970, 1, 1, tzinfo=timezone.utc) class CSVRevisionIterator: """Iterator over revisions typically present in the given CSV file. The input is an iterator that produces 3 elements per row: (id, date, root) where: - id: is the id (sha1_git) of the revision - date: is the author date - root: sha1 of the directory """ def __init__( self, revisions: Iterable[Tuple[Sha1Git, datetime, Sha1Git]], limit: Optional[int] = None, ) -> None: self.revisions: Iterator[Tuple[Sha1Git, datetime, Sha1Git]] if limit is not None: from itertools import islice self.revisions = islice(revisions, limit) else: self.revisions = iter(revisions) def __iter__(self) -> Generator[RevisionEntry, None, None]: for id, date, root in self.revisions: if date.tzinfo is None: date = date.replace(tzinfo=timezone.utc) yield RevisionEntry(id, date=date, root=root) @statsd.timed(metric=REVISION_DURATION_METRIC, tags={"method": "main"}) def revision_add( provenance: ProvenanceInterface, archive: ArchiveInterface, revisions: List[RevisionEntry], trackall: bool = True, flatten: bool = True, lower: bool = True, mindepth: int = 1, minsize: int = 0, commit: bool = True, max_directory_size: int = 0, ) -> None: revs_processed = 0 batch_size = len(revisions) revs_to_commit = False for batch_pos, revision in enumerate( sorted(revisions, key=lambda r: r.date or EPOCH) ): assert revision.date is not None assert revision.root is not None # Processed content starting from the revision's root directory. date = provenance.revision_get_date(revision) if date is None or revision.date < date: logger.debug( "Processing revision %s on %s (root %s)", revision.id.hex(), revision.date, revision.root.hex(), ) logger.debug("provenance date: %s, building isochrone graph", date) try: graph = build_isochrone_graph( provenance, archive, revision, DirectoryEntry(revision.root), minsize=minsize, max_directory_size=max_directory_size, ) except DirectoryTooLarge: logger.warn( "Ignoring revision %s: root directory %s too large", revision.id.hex(), revision.root.hex(), ) continue logger.debug("isochrone graph built, processing content") revision_process_content( provenance, archive, revision, graph, trackall=trackall, flatten=flatten, lower=lower, mindepth=mindepth, minsize=minsize, ) revs_processed += 1 revs_to_commit = True if revs_to_commit and commit: flushed = provenance.flush_if_necessary() if flushed: revs_to_commit = False logger.debug( "flushed (rev %s/%s, processed %s)", batch_pos + 1, batch_size, revs_processed, ) if revs_to_commit and commit: logger.debug("flushing batch") provenance.flush() @statsd.timed(metric=REVISION_DURATION_METRIC, tags={"method": "process_content"}) def revision_process_content( provenance: ProvenanceInterface, archive: ArchiveInterface, revision: RevisionEntry, graph: IsochroneNode, trackall: bool = True, flatten: bool = True, lower: bool = True, mindepth: int = 1, minsize: int = 0, ) -> None: assert revision.date is not None provenance.revision_add(revision) stack = [graph] while stack: current = stack.pop() if current.dbdate is not None: assert current.dbdate < revision.date if trackall: # Current directory is an outer isochrone frontier for a previously # processed revision. It should be reused as is. provenance.directory_add_to_revision( revision, current.entry, current.path ) else: assert current.maxdate is not None # Current directory is not an outer isochrone frontier for any previous # revision. It might be eligible for this one. if is_new_frontier( current, revision=revision, lower=lower, mindepth=mindepth, ): # Outer frontier should be moved to current position in the isochrone # graph. This is the first time this directory is found in the isochrone # frontier. provenance.directory_set_date_in_isochrone_frontier( current.entry, current.maxdate ) if trackall: provenance.directory_add_to_revision( revision, current.entry, current.path ) if flatten: directory_flatten( provenance, archive, current.entry, minsize=minsize ) else: # If current node is an invalidated frontier, update its date for future # revisions to get the proper value. if current.invalid: provenance.directory_set_date_in_isochrone_frontier( current.entry, revision.date ) # No point moving the frontier here. Either there are no files or they # are being seen for the first time here. Add all blobs to current # revision updating date if necessary, and recursively analyse # subdirectories as candidates to the outer frontier. for blob in current.entry.files: date = provenance.content_get_early_date(blob) if date is None or revision.date < date: provenance.content_set_early_date(blob, revision.date) provenance.content_add_to_revision(revision, blob, current.path) for child in current.children: stack.append(child) def is_new_frontier( node: IsochroneNode, revision: RevisionEntry, lower: bool = True, mindepth: int = 1, ) -> bool: assert node.maxdate is not None # for mypy assert revision.date is not None # idem # We want to ensure that all first occurrences end up in the content_early_in_rev # relation. Thus, we force for every blob outside a frontier to have an strictly # earlier date. return ( node.maxdate < revision.date # all content is earlier than revision and node.depth >= mindepth # deeper than the min allowed depth and (has_blobs(node) if lower else True) # there is at least one blob ) def has_blobs(node: IsochroneNode) -> bool: # We may want to look for files in different ways to decide whether to define a # frontier or not: # 1. Only files in current node: return any(node.entry.files) # 2. Files anywhere in the isochrone graph # stack = [node] # while stack: # current = stack.pop() # if any(current.entry.files): # return True # else: # # All children are directory entries. # stack.extend(current.children) # return False # 3. Files in the intermediate directories between current node and any previously # defined frontier: # TODO: complete this case! diff --git a/swh/provenance/cli.py b/swh/provenance/cli.py index 187559c..6b46269 100644 --- a/swh/provenance/cli.py +++ b/swh/provenance/cli.py @@ -1,688 +1,688 @@ # Copyright (C) 2021-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information # WARNING: do not import unnecessary things here to keep cli startup time under # control from datetime import datetime, timezone from functools import partial import os from typing import Any, Dict, Generator, Optional, Tuple import click from deprecated import deprecated import iso8601 import yaml try: from systemd.daemon import notify except ImportError: notify = None from swh.core import config from swh.core.cli import CONTEXT_SETTINGS from swh.core.cli import swh as swh_cli_group from swh.model.hashutil import hash_to_bytes, hash_to_hex from swh.model.model import Sha1Git # All generic config code should reside in swh.core.config CONFIG_ENVVAR = "SWH_CONFIG_FILENAME" DEFAULT_PATH = os.environ.get(CONFIG_ENVVAR, None) DEFAULT_CONFIG: Dict[str, Any] = { "provenance": { "archive": { # Storage API based Archive object # "cls": "api", # "storage": { # "cls": "remote", # "url": "http://uffizi.internal.softwareheritage.org:5002", # } # Direct access Archive object "cls": "direct", "db": { "host": "belvedere.internal.softwareheritage.org", "port": 5432, "dbname": "softwareheritage", "user": "guest", }, }, "storage": { # Local PostgreSQL Storage # "cls": "postgresql", # "db": { # "host": "localhost", # "user": "postgres", # "password": "postgres", # "dbname": "provenance", # }, # Remote RabbitMQ/PostgreSQL Storage "cls": "rabbitmq", "url": "amqp://localhost:5672/%2f", "storage_config": { "cls": "postgresql", "db": { "host": "localhost", "user": "postgres", "password": "postgres", "dbname": "provenance", }, }, "batch_size": 100, "prefetch_count": 100, }, } } CONFIG_FILE_HELP = f""" \b Configuration can be loaded from a yaml file given either as --config-file option or the {CONFIG_ENVVAR} environment variable. If no configuration file is specified, use the following default configuration:: \b {yaml.dump(DEFAULT_CONFIG)}""" PROVENANCE_HELP = f"""Software Heritage provenance index database tools {CONFIG_FILE_HELP} """ @swh_cli_group.group( name="provenance", context_settings=CONTEXT_SETTINGS, help=PROVENANCE_HELP ) @click.option( "-C", "--config-file", default=None, type=click.Path(exists=True, dir_okay=False, path_type=str), help="""YAML configuration file.""", ) @click.option( "-P", "--profile", default=None, type=click.Path(exists=False, dir_okay=False, path_type=str), help="""Enable profiling to specified file.""", ) @click.pass_context def cli(ctx: click.core.Context, config_file: Optional[str], profile: str) -> None: if ( config_file is None and DEFAULT_PATH is not None and config.config_exists(DEFAULT_PATH) ): config_file = DEFAULT_PATH if config_file is None: conf = DEFAULT_CONFIG else: # read_raw_config do not fail on ENOENT if not os.path.exists(config_file): raise FileNotFoundError(config_file) conf = yaml.safe_load(open(config_file, "rb")) ctx.ensure_object(dict) ctx.obj["config"] = conf if profile: import atexit import cProfile print("Profiling...") pr = cProfile.Profile() pr.enable() def exit() -> None: pr.disable() pr.dump_stats(profile) atexit.register(exit) @cli.group(name="origin") @click.pass_context def origin(ctx: click.core.Context): from . import get_provenance from .archive import get_archive archive = get_archive(**ctx.obj["config"]["provenance"]["archive"]) provenance = get_provenance(**ctx.obj["config"]["provenance"]["storage"]) ctx.obj["provenance"] = provenance ctx.obj["archive"] = archive @origin.command(name="from-csv") @click.argument("filename", type=click.Path(exists=True)) @click.option( "-l", "--limit", type=int, help="""Limit the amount of entries (origins) to read from the input file.""", ) @click.pass_context def origin_from_csv(ctx: click.core.Context, filename: str, limit: Optional[int]): - from .origin import CSVOriginIterator, origin_add + from swh.provenance.algos.origin import CSVOriginIterator, origin_add provenance = ctx.obj["provenance"] archive = ctx.obj["archive"] origins_provider = generate_origin_tuples(filename) origins = CSVOriginIterator(origins_provider, limit=limit) with provenance: for origin in origins: origin_add(provenance, archive, [origin]) @origin.command(name="from-journal") @click.pass_context def origin_from_journal(ctx: click.core.Context): from swh.journal.client import get_journal_client from .journal_client import process_journal_origins provenance = ctx.obj["provenance"] archive = ctx.obj["archive"] journal_cfg = ctx.obj["config"].get("journal_client", {}) worker_fn = partial( process_journal_origins, archive=archive, provenance=provenance, ) cls = journal_cfg.pop("cls", None) or "kafka" client = get_journal_client( cls, **{ **journal_cfg, "object_types": ["origin_visit_status"], }, ) if notify: notify("READY=1") try: with provenance: client.process(worker_fn) except KeyboardInterrupt: ctx.exit(0) else: print("Done.") finally: if notify: notify("STOPPING=1") client.close() @cli.group(name="revision") @click.pass_context def revision(ctx: click.core.Context): from . import get_provenance from .archive import get_archive archive = get_archive(**ctx.obj["config"]["provenance"]["archive"]) provenance = get_provenance(**ctx.obj["config"]["provenance"]["storage"]) ctx.obj["provenance"] = provenance ctx.obj["archive"] = archive @revision.command(name="from-csv") @click.argument("filename", type=click.Path(exists=True)) @click.option( "-a", "--track-all", default=True, type=bool, help="""Index all occurrences of files in the development history.""", ) @click.option( "-f", "--flatten", default=True, type=bool, help="""Create flat models for directories in the isochrone frontier.""", ) @click.option( "-l", "--limit", type=int, help="""Limit the amount of entries (revisions) to read from the input file.""", ) @click.option( "-m", "--min-depth", default=1, type=int, help="""Set minimum depth (in the directory tree) at which an isochrone """ """frontier can be defined.""", ) @click.option( "-r", "--reuse", default=True, type=bool, help="""Prioritize the usage of previously defined isochrone frontiers """ """whenever possible.""", ) @click.option( "-s", "--min-size", default=0, type=int, help="""Set the minimum size (in bytes) of files to be indexed. """ """Any smaller file will be ignored.""", ) @click.option( "-d", "--max-directory-size", default=0, type=int, help="""Set the maximum recursive directory size of revisions to be indexed.""", ) @click.pass_context def revision_from_csv( ctx: click.core.Context, filename: str, track_all: bool, flatten: bool, limit: Optional[int], min_depth: int, reuse: bool, min_size: int, max_directory_size: int, ) -> None: - from .revision import CSVRevisionIterator, revision_add + from swh.provenance.algos.revision import CSVRevisionIterator, revision_add provenance = ctx.obj["provenance"] archive = ctx.obj["archive"] revisions_provider = generate_revision_tuples(filename) revisions = CSVRevisionIterator(revisions_provider, limit=limit) with provenance: for revision in revisions: revision_add( provenance, archive, [revision], trackall=track_all, flatten=flatten, lower=reuse, mindepth=min_depth, minsize=min_size, max_directory_size=max_directory_size, ) @revision.command(name="from-journal") @click.option( "-a", "--track-all", default=True, type=bool, help="""Index all occurrences of files in the development history.""", ) @click.option( "-f", "--flatten", default=True, type=bool, help="""Create flat models for directories in the isochrone frontier.""", ) @click.option( "-l", "--limit", type=int, help="""Limit the amount of entries (revisions) to read from the input file.""", ) @click.option( "-m", "--min-depth", default=1, type=int, help="""Set minimum depth (in the directory tree) at which an isochrone """ """frontier can be defined.""", ) @click.option( "-r", "--reuse", default=True, type=bool, help="""Prioritize the usage of previously defined isochrone frontiers """ """whenever possible.""", ) @click.option( "-s", "--min-size", default=0, type=int, help="""Set the minimum size (in bytes) of files to be indexed. """ """Any smaller file will be ignored.""", ) @click.option( "-d", "--max-directory-size", default=0, type=int, help="""Set the maximum recursive directory size of revisions to be indexed.""", ) @click.pass_context def revision_from_journal( ctx: click.core.Context, track_all: bool, flatten: bool, limit: Optional[int], min_depth: int, reuse: bool, min_size: int, max_directory_size: int, ) -> None: from swh.journal.client import get_journal_client from .journal_client import process_journal_revisions provenance = ctx.obj["provenance"] archive = ctx.obj["archive"] journal_cfg = ctx.obj["config"].get("journal_client", {}) worker_fn = partial( process_journal_revisions, archive=archive, provenance=provenance, minsize=min_size, max_directory_size=max_directory_size, ) cls = journal_cfg.pop("cls", None) or "kafka" client = get_journal_client( cls, **{ **journal_cfg, "object_types": ["revision"], }, ) if notify: notify("READY=1") try: with provenance: client.process(worker_fn) except KeyboardInterrupt: ctx.exit(0) else: print("Done.") finally: if notify: notify("STOPPING=1") client.close() @cli.group(name="directory") @click.pass_context def directory(ctx: click.core.Context): from . import get_provenance from .archive import get_archive archive = get_archive(**ctx.obj["config"]["provenance"]["archive"]) provenance = get_provenance(**ctx.obj["config"]["provenance"]["storage"]) ctx.obj["provenance"] = provenance ctx.obj["archive"] = archive @directory.command(name="flatten") @click.option( "--range-from", type=str, help="start ID of the range of directories to flatten" ) @click.option( "--range-to", type=str, help="stop ID of the range of directories to flatten" ) @click.option( "-s", "--min-size", default=0, type=int, help="""Set the minimum size (in bytes) of files to be indexed. Any smaller file will be ignored.""", ) @click.pass_context def directory_flatten(ctx: click.core.Context, range_from, range_to, min_size): - from swh.provenance.directory import directory_flatten_range + from swh.provenance.algos.directory import directory_flatten_range provenance = ctx.obj["provenance"] archive = ctx.obj["archive"] directory_flatten_range( provenance, archive, hash_to_bytes(range_from), hash_to_bytes(range_to), min_size, ) # old (deprecated) commands @cli.command(name="iter-frontiers") @click.argument("filename") @click.option( "-l", "--limit", type=int, help="""Limit the amount of entries (directories) to read from the input file.""", ) @click.option( "-s", "--min-size", default=0, type=int, help="""Set the minimum size (in bytes) of files to be indexed. """ """Any smaller file will be ignored.""", ) @click.pass_context def iter_frontiers( ctx: click.core.Context, filename: str, limit: Optional[int], min_size: int, ) -> None: """Process a provided list of directories in the isochrone frontier.""" - from . import get_provenance - from .archive import get_archive - from .directory import CSVDirectoryIterator, directory_add + from swh.provenance import get_provenance + from swh.provenance.algos.directory import CSVDirectoryIterator, directory_add + from swh.provenance.archive import get_archive archive = get_archive(**ctx.obj["config"]["provenance"]["archive"]) directories_provider = generate_directory_ids(filename) directories = CSVDirectoryIterator(directories_provider, limit=limit) with get_provenance(**ctx.obj["config"]["provenance"]["storage"]) as provenance: for directory in directories: directory_add( provenance, archive, [directory], minsize=min_size, ) def generate_directory_ids( filename: str, ) -> Generator[Sha1Git, None, None]: for line in open(filename, "r"): if line.strip(): yield hash_to_bytes(line.strip()) @cli.command(name="iter-revisions") @click.argument("filename") @click.option( "-a", "--track-all", default=True, type=bool, help="""Index all occurrences of files in the development history.""", ) @click.option( "-f", "--flatten", default=True, type=bool, help="""Create flat models for directories in the isochrone frontier.""", ) @click.option( "-l", "--limit", type=int, help="""Limit the amount of entries (revisions) to read from the input file.""", ) @click.option( "-m", "--min-depth", default=1, type=int, help="""Set minimum depth (in the directory tree) at which an isochrone """ """frontier can be defined.""", ) @click.option( "-r", "--reuse", default=True, type=bool, help="""Prioritize the usage of previously defined isochrone frontiers """ """whenever possible.""", ) @click.option( "-s", "--min-size", default=0, type=int, help="""Set the minimum size (in bytes) of files to be indexed. """ """Any smaller file will be ignored.""", ) @click.pass_context def iter_revisions( ctx: click.core.Context, filename: str, track_all: bool, flatten: bool, limit: Optional[int], min_depth: int, reuse: bool, min_size: int, ) -> None: """Process a provided list of revisions.""" - from . import get_provenance - from .archive import get_archive - from .revision import CSVRevisionIterator, revision_add + from swh.provenance import get_provenance + from swh.provenance.algos.revision import CSVRevisionIterator, revision_add + from swh.provenance.archive import get_archive archive = get_archive(**ctx.obj["config"]["provenance"]["archive"]) revisions_provider = generate_revision_tuples(filename) revisions = CSVRevisionIterator(revisions_provider, limit=limit) with get_provenance(**ctx.obj["config"]["provenance"]["storage"]) as provenance: for revision in revisions: revision_add( provenance, archive, [revision], trackall=track_all, flatten=flatten, lower=reuse, mindepth=min_depth, minsize=min_size, ) def generate_revision_tuples( filename: str, ) -> Generator[Tuple[Sha1Git, datetime, Sha1Git], None, None]: for line in open(filename, "r"): if line.strip(): revision, date, root = line.strip().split(",") yield ( hash_to_bytes(revision), iso8601.parse_date(date, default_timezone=timezone.utc), hash_to_bytes(root), ) @cli.command(name="iter-origins") @click.argument("filename") @click.option( "-l", "--limit", type=int, help="""Limit the amount of entries (origins) to read from the input file.""", ) @click.pass_context @deprecated(version="0.0.1", reason="Use `swh provenance origin from-csv` instead") def iter_origins(ctx: click.core.Context, filename: str, limit: Optional[int]) -> None: """Process a provided list of origins.""" - from . import get_provenance - from .archive import get_archive - from .origin import CSVOriginIterator, origin_add + from swh.provenance import get_provenance + from swh.provenance.algos.origin import CSVOriginIterator, origin_add + from swh.provenance.archive import get_archive archive = get_archive(**ctx.obj["config"]["provenance"]["archive"]) origins_provider = generate_origin_tuples(filename) origins = CSVOriginIterator(origins_provider, limit=limit) with get_provenance(**ctx.obj["config"]["provenance"]["storage"]) as provenance: for origin in origins: origin_add(provenance, archive, [origin]) def generate_origin_tuples(filename: str) -> Generator[Tuple[str, bytes], None, None]: for line in open(filename, "r"): if line.strip(): url, snapshot = line.strip().split(",") yield (url, hash_to_bytes(snapshot)) @cli.command(name="find-first") @click.argument("swhid") @click.pass_context def find_first(ctx: click.core.Context, swhid: str) -> None: """Find first occurrence of the requested blob.""" from . import get_provenance with get_provenance(**ctx.obj["config"]["provenance"]["storage"]) as provenance: occur = provenance.content_find_first(hash_to_bytes(swhid)) if occur is not None: print( f"swh:1:cnt:{hash_to_hex(occur.content)}, " f"swh:1:rev:{hash_to_hex(occur.revision)}, " f"{occur.date}, " f"{occur.origin}, " f"{os.fsdecode(occur.path)}" ) else: print(f"Cannot find a content with the id {swhid}") @cli.command(name="find-all") @click.argument("swhid") @click.option( "-l", "--limit", type=int, help="""Limit the amount results to be retrieved.""" ) @click.pass_context def find_all(ctx: click.core.Context, swhid: str, limit: Optional[int]) -> None: """Find all occurrences of the requested blob.""" from . import get_provenance with get_provenance(**ctx.obj["config"]["provenance"]["storage"]) as provenance: for occur in provenance.content_find_all(hash_to_bytes(swhid), limit=limit): print( f"swh:1:cnt:{hash_to_hex(occur.content)}, " f"swh:1:rev:{hash_to_hex(occur.revision)}, " f"{occur.date}, " f"{occur.origin}, " f"{os.fsdecode(occur.path)}" ) diff --git a/swh/provenance/journal_client.py b/swh/provenance/journal_client.py index 9332005..cfb9135 100644 --- a/swh/provenance/journal_client.py +++ b/swh/provenance/journal_client.py @@ -1,69 +1,69 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime try: from systemd.daemon import notify except ImportError: notify = None import sentry_sdk from swh.model.model import TimestampWithTimezone +from swh.provenance.algos.origin import origin_add +from swh.provenance.algos.revision import revision_add from swh.provenance.archive import ArchiveInterface from swh.provenance.interface import ProvenanceInterface from swh.provenance.model import OriginEntry, RevisionEntry -from swh.provenance.origin import origin_add -from swh.provenance.revision import revision_add EPOCH = datetime.datetime(1970, 1, 1, tzinfo=datetime.timezone.utc) def process_journal_origins( messages, *, provenance: ProvenanceInterface, archive: ArchiveInterface, **cfg ) -> None: """Worker function for `JournalClient.process(worker_fn)`.""" assert set(messages) == {"origin_visit_status"}, set(messages) origin_entries = [ OriginEntry(url=visit["origin"], snapshot=visit["snapshot"]) for visit in messages["origin_visit_status"] if visit["snapshot"] is not None ] if origin_entries: origin_add(provenance, archive, origin_entries, **cfg) if notify: notify("WATCHDOG=1") def process_journal_revisions( messages, *, provenance: ProvenanceInterface, archive: ArchiveInterface, **cfg ) -> None: """Worker function for `JournalClient.process(worker_fn)`.""" assert set(messages) == {"revision"}, set(messages) revisions = [] for rev in messages["revision"]: if rev["date"] is None: continue try: date = TimestampWithTimezone.from_dict(rev["date"]).to_datetime() except Exception: sentry_sdk.capture_exception() continue if date <= EPOCH: continue revisions.append( RevisionEntry( id=rev["id"], root=rev["directory"], date=date, ) ) if revisions: revision_add(provenance, archive, revisions, **cfg) if notify: notify("WATCHDOG=1") diff --git a/swh/provenance/tests/test_consistency.py b/swh/provenance/tests/test_consistency.py index 7653c95..d14be0a 100644 --- a/swh/provenance/tests/test_consistency.py +++ b/swh/provenance/tests/test_consistency.py @@ -1,92 +1,92 @@ # Copyright (C) 2021-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from swh.model.hashutil import hash_to_bytes +from swh.provenance.algos.revision import revision_add from swh.provenance.archive import ArchiveInterface from swh.provenance.interface import ProvenanceInterface from swh.provenance.model import RevisionEntry -from swh.provenance.revision import revision_add from swh.provenance.storage.interface import DirectoryData, ProvenanceResult from swh.provenance.tests.conftest import fill_storage, load_repo_data, ts2dt def test_consistency( provenance: ProvenanceInterface, archive: ArchiveInterface, ) -> None: # read data/README.md for more details on how these datasets are generated data = load_repo_data("cmdbts2") fill_storage(archive.storage, data) revisions = {rev["id"]: rev for rev in data["revision"]} # Process R00 first as expected rev_00 = revisions[hash_to_bytes("c0d8929936631ecbcf9147be6b8aa13b13b014e4")] r00 = RevisionEntry( id=rev_00["id"], date=ts2dt(rev_00["date"]), root=rev_00["directory"], ) revision_add(provenance, archive, [r00]) # Register contents A/B/C/b from R01 in the storage to simulate a crash rev_01 = revisions[hash_to_bytes("1444db96cbd8cd791abe83527becee73d3c64e86")] r01 = RevisionEntry( id=rev_01["id"], date=ts2dt(rev_01["date"]), root=rev_01["directory"], ) assert r01.date is not None # for mypy cnt_b_sha1 = hash_to_bytes("50e9cdb03f9719261dd39d7f2920b906db3711a3") provenance.storage.content_add({cnt_b_sha1: r01.date}) # Process R02 (this should set a frontier in directory C) rev_02 = revisions[hash_to_bytes("0d45f1ee524db8f6f0b5a267afac4e733b4b2cee")] r02 = RevisionEntry( id=rev_02["id"], date=ts2dt(rev_02["date"]), root=rev_02["directory"], ) assert r02.date is not None # for mypy revision_add(provenance, archive, [r02]) dir_C_sha1 = hash_to_bytes("c9cabe7f49012e3fdef6ac6b929efb5654f583cf") assert provenance.storage.directory_get([dir_C_sha1]) == { dir_C_sha1: DirectoryData(r01.date, True) } assert provenance.content_find_first(cnt_b_sha1) is None # No first occurrence assert set(provenance.content_find_all(cnt_b_sha1)) == { ProvenanceResult( content=cnt_b_sha1, revision=r02.id, date=r02.date, origin=None, path=b"A/B/C/b", ) } # Process R01 out of order (frontier in C should not be reused to guarantee that the # first occurrence of A/B/C/b is in the CNT_EARLY_IN_REV relation) revision_add(provenance, archive, [r01]) assert provenance.content_find_first(cnt_b_sha1) == ProvenanceResult( content=cnt_b_sha1, revision=r01.id, date=r01.date, origin=None, path=b"A/B/C/b" ) assert set(provenance.content_find_all(cnt_b_sha1)) == { ProvenanceResult( content=cnt_b_sha1, revision=r01.id, date=r01.date, origin=None, path=b"A/B/C/b", ), ProvenanceResult( content=cnt_b_sha1, revision=r02.id, date=r02.date, origin=None, path=b"A/B/C/b", ), } diff --git a/swh/provenance/tests/test_directory_flatten.py b/swh/provenance/tests/test_directory_flatten.py index 62541b2..4a62b86 100644 --- a/swh/provenance/tests/test_directory_flatten.py +++ b/swh/provenance/tests/test_directory_flatten.py @@ -1,101 +1,101 @@ # Copyright (C) 2021-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime, timezone from typing import Tuple from swh.model.hashutil import hash_to_bytes +from swh.provenance.algos.directory import directory_add, directory_flatten_range from swh.provenance.archive import ArchiveInterface -from swh.provenance.directory import directory_add, directory_flatten_range from swh.provenance.interface import ProvenanceInterface from swh.provenance.model import DirectoryEntry, FileEntry from swh.provenance.storage.interface import DirectoryData, RelationData, RelationType from swh.provenance.tests.conftest import fill_storage, load_repo_data def prepare( provenance: ProvenanceInterface, archive: ArchiveInterface ) -> Tuple[datetime, DirectoryEntry, FileEntry, FileEntry]: """Prepare the provenance database with some content suitable for flattening tests""" # read data/README.md for more details on how these datasets are generated data = load_repo_data("cmdbts2") fill_storage(archive.storage, data) # just take a directory that is known to exists in cmdbts2 directory = DirectoryEntry( id=hash_to_bytes("48007c961cc734d1f63886d0413a6dc605e3e2ea") ) content1 = FileEntry( id=hash_to_bytes("20329687bb9c1231a7e05afe86160343ad49b494"), name=b"a" ) content2 = FileEntry( id=hash_to_bytes("50e9cdb03f9719261dd39d7f2920b906db3711a3"), name=b"b" ) date = datetime.fromtimestamp(1000000010, timezone.utc) # directory_add and the internal directory_flatten require the directory and its # content to be known by the provenance object. Otherwise, they do nothing provenance.directory_set_date_in_isochrone_frontier(directory, date) provenance.content_set_early_date(content1, date) provenance.content_set_early_date(content2, date) provenance.flush() assert provenance.storage.directory_get([directory.id]) == { directory.id: DirectoryData(date=date, flat=False) } assert provenance.storage.content_get([content1.id, content2.id]) == { content1.id: date, content2.id: date, } # this query forces the directory date to be retrieved from the storage and cached # (otherwise, the flush below won't update the directory flatten flag) flattenned = provenance.directory_already_flattenned(directory) assert flattenned is not None and not flattenned return date, directory, content1, content2 def test_directory_add( provenance: ProvenanceInterface, archive: ArchiveInterface, ) -> None: date, directory, content1, content2 = prepare(provenance, archive) # flatten the directory and check the expected result directory_add(provenance, archive, [directory]) assert provenance.storage.directory_get([directory.id]) == { directory.id: DirectoryData(date=date, flat=True) } assert provenance.storage.relation_get_all(RelationType.CNT_IN_DIR) == { content1.id: { RelationData(dst=directory.id, path=b"a"), RelationData(dst=directory.id, path=b"C/a"), }, content2.id: {RelationData(dst=directory.id, path=b"C/b")}, } def test_directory_flatten_range( provenance: ProvenanceInterface, archive: ArchiveInterface, ) -> None: date, directory, content1, content2 = prepare(provenance, archive) # flatten the directory and check the expected result directory_flatten_range(provenance, archive, directory.id[:-1], directory.id) assert provenance.storage.directory_get([directory.id]) == { directory.id: DirectoryData(date=date, flat=True) } assert provenance.storage.relation_get_all(RelationType.CNT_IN_DIR) == { content1.id: { RelationData(dst=directory.id, path=b"a"), RelationData(dst=directory.id, path=b"C/a"), }, content2.id: {RelationData(dst=directory.id, path=b"C/b")}, } diff --git a/swh/provenance/tests/test_directory_iterator.py b/swh/provenance/tests/test_directory_iterator.py index 0711e14..9601aa5 100644 --- a/swh/provenance/tests/test_directory_iterator.py +++ b/swh/provenance/tests/test_directory_iterator.py @@ -1,29 +1,29 @@ # Copyright (C) 2021-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import pytest -from swh.provenance.directory import CSVDirectoryIterator +from swh.provenance.algos.directory import CSVDirectoryIterator from swh.provenance.tests.conftest import fill_storage, load_repo_data from swh.storage.interface import StorageInterface @pytest.mark.parametrize( "repo", ( "cmdbts2", "out-of-order", ), ) def test_revision_iterator(swh_storage: StorageInterface, repo: str) -> None: """Test CSVDirectoryIterator""" data = load_repo_data(repo) fill_storage(swh_storage, data) directories_ids = [dir["id"] for dir in data["directory"]] directories = list(CSVDirectoryIterator(directories_ids)) assert directories assert len(directories) == len(data["directory"]) diff --git a/swh/provenance/tests/test_history_graph.py b/swh/provenance/tests/test_history_graph.py index eeca3b6..5612c0c 100644 --- a/swh/provenance/tests/test_history_graph.py +++ b/swh/provenance/tests/test_history_graph.py @@ -1,55 +1,54 @@ # Copyright (C) 2021-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import pytest import yaml from swh.model.hashutil import hash_to_bytes +from swh.provenance.algos.origin import HistoryGraph, origin_add_revision from swh.provenance.archive import ArchiveInterface -from swh.provenance.graph import HistoryGraph from swh.provenance.interface import ProvenanceInterface from swh.provenance.model import OriginEntry, RevisionEntry -from swh.provenance.origin import origin_add_revision from swh.provenance.tests.conftest import fill_storage, get_datafile, load_repo_data @pytest.mark.origin_layer @pytest.mark.parametrize( "repo, visit", (("with-merges", "visits-01"),), ) @pytest.mark.parametrize("batch", (True, False)) def test_history_graph( provenance: ProvenanceInterface, archive: ArchiveInterface, repo: str, visit: str, batch: bool, ) -> None: # read data/README.md for more details on how these datasets are generated data = load_repo_data(repo) fill_storage(archive.storage, data) filename = f"history_graphs_{repo}_{visit}.yaml" with open(get_datafile(filename)) as file: for expected in yaml.full_load(file): entry = OriginEntry(expected["origin"], hash_to_bytes(expected["snapshot"])) provenance.origin_add(entry) for expected_graph_as_dict in expected["graphs"]: print("Expected graph:", expected_graph_as_dict) computed_graph = HistoryGraph( archive, RevisionEntry(hash_to_bytes(expected_graph_as_dict["head"])), ) print("Computed graph:", computed_graph.as_dict()) assert computed_graph.as_dict() == expected_graph_as_dict origin_add_revision(provenance, entry, computed_graph) if not batch: provenance.flush() diff --git a/swh/provenance/tests/test_isochrone_graph.py b/swh/provenance/tests/test_isochrone_graph.py index aac83e7..81202fc 100644 --- a/swh/provenance/tests/test_isochrone_graph.py +++ b/swh/provenance/tests/test_isochrone_graph.py @@ -1,148 +1,152 @@ # Copyright (C) 2021-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from copy import deepcopy from datetime import datetime, timezone from typing import Any, Dict import pytest import yaml from swh.model.hashutil import hash_to_bytes +from swh.provenance.algos.isochrone_graph import ( + DirectoryTooLarge, + IsochroneNode, + build_isochrone_graph, +) +from swh.provenance.algos.revision import revision_add from swh.provenance.archive import ArchiveInterface -from swh.provenance.graph import DirectoryTooLarge, IsochroneNode, build_isochrone_graph from swh.provenance.interface import ProvenanceInterface from swh.provenance.model import DirectoryEntry, RevisionEntry -from swh.provenance.revision import revision_add from swh.provenance.tests.conftest import ( fill_storage, get_datafile, load_repo_data, ts2dt, ) def isochrone_graph_from_dict(d: Dict[str, Any], depth: int = 0) -> IsochroneNode: """Takes a dictionary representing a tree of IsochroneNode objects, and recursively builds the corresponding graph.""" d = deepcopy(d) d["entry"]["id"] = hash_to_bytes(d["entry"]["id"]) d["entry"]["name"] = bytes(d["entry"]["name"], encoding="utf-8") dbdate = d.get("dbdate", None) if dbdate is not None: dbdate = datetime.fromtimestamp(d["dbdate"], timezone.utc) children = d.get("children", []) node = IsochroneNode( entry=DirectoryEntry(**d["entry"]), dbdate=dbdate, depth=depth, ) node.maxdate = datetime.fromtimestamp(d["maxdate"], timezone.utc) node.invalid = d.get("invalid", False) node.path = bytes(d["path"], encoding="utf-8") node.children = set( isochrone_graph_from_dict(child, depth=depth + 1) for child in children ) return node @pytest.mark.parametrize( "repo, lower, mindepth", ( ("cmdbts2", True, 1), ("cmdbts2", False, 1), ("cmdbts2", True, 2), ("cmdbts2", False, 2), ("out-of-order", True, 1), ), ) @pytest.mark.parametrize("batch", (True, False)) def test_isochrone_graph( provenance: ProvenanceInterface, archive: ArchiveInterface, repo: str, lower: bool, mindepth: int, batch: bool, ) -> None: # read data/README.md for more details on how these datasets are generated data = load_repo_data(repo) fill_storage(archive.storage, data) revisions = {rev["id"]: rev for rev in data["revision"]} filename = f"graphs_{repo}_{'lower' if lower else 'upper'}_{mindepth}.yaml" with open(get_datafile(filename)) as file: for expected in yaml.full_load(file): print("# Processing revision", expected["rev"]) revision = revisions[hash_to_bytes(expected["rev"])] entry = RevisionEntry( id=revision["id"], date=ts2dt(revision["date"]), root=revision["directory"], ) expected_graph = isochrone_graph_from_dict(expected["graph"]) print("Expected graph:", expected_graph) # Create graph for current revision and check it has the expected structure. assert entry.root is not None computed_graph = build_isochrone_graph( provenance, archive, entry, DirectoryEntry(entry.root), ) print("Computed graph:", computed_graph) assert computed_graph == expected_graph # Add current revision so that provenance info is kept up to date for the # following ones. revision_add( provenance, archive, [entry], lower=lower, mindepth=mindepth, commit=not batch, ) def test_isochrone_graph_max_dir_size( provenance: ProvenanceInterface, archive: ArchiveInterface, ): data = load_repo_data("git-bomb") fill_storage(archive.storage, data) rev = archive.storage.revision_get( [hash_to_bytes("7af99c9e7d4768fa681f4fe4ff61259794cf719b")] )[0] assert rev is not None assert rev.date is not None with pytest.raises(DirectoryTooLarge, match="Max directory size exceeded"): build_isochrone_graph( provenance, archive, RevisionEntry(id=rev.id, date=rev.date.to_datetime(), root=rev.directory), DirectoryEntry(rev.directory), max_directory_size=1000, ) pass # from this directory, there should be only ~1k recursive entries, so the # call to build_isochrone_graph with max_directory_size=1200 should succeed dir_id = hash_to_bytes("3e50041e82b225ca9e9b2641548b0c1b81eb971b") build_isochrone_graph( provenance, archive, RevisionEntry(id=rev.id, date=rev.date.to_datetime(), root=dir_id), DirectoryEntry(dir_id), max_directory_size=1200, ) diff --git a/swh/provenance/tests/test_origin_iterator.py b/swh/provenance/tests/test_origin_iterator.py index 581aaed..6492109 100644 --- a/swh/provenance/tests/test_origin_iterator.py +++ b/swh/provenance/tests/test_origin_iterator.py @@ -1,47 +1,47 @@ # Copyright (C) 2021-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import pytest -from swh.provenance.origin import CSVOriginIterator +from swh.provenance.algos.origin import CSVOriginIterator from swh.provenance.tests.conftest import fill_storage, load_repo_data from swh.storage.algos.origin import ( iter_origin_visit_statuses, iter_origin_visits, iter_origins, ) from swh.storage.interface import StorageInterface @pytest.mark.origin_layer @pytest.mark.parametrize( "repo", ( "cmdbts2", "out-of-order", ), ) def test_origin_iterator(swh_storage: StorageInterface, repo: str) -> None: """Test CSVOriginIterator""" data = load_repo_data(repo) fill_storage(swh_storage, data) origins_csv = [] for origin in iter_origins(swh_storage): for visit in iter_origin_visits(swh_storage, origin.url): if visit.visit is not None: for status in iter_origin_visit_statuses( swh_storage, origin.url, visit.visit ): if status.snapshot is not None: origins_csv.append((status.origin, status.snapshot)) origins = list(CSVOriginIterator(origins_csv)) assert origins # there can be more origins, depending on the additional extra visits.yaml # file used during dataset generation (see data/generate_storage_from_git) assert len(origins) >= len(data["origin"]) # but we can check it's a subset assert set(o.url for o in origins) <= set(o["url"] for o in data["origin"]) diff --git a/swh/provenance/tests/test_origin_revision_layer.py b/swh/provenance/tests/test_origin_revision_layer.py index a0ca437..1948fed 100644 --- a/swh/provenance/tests/test_origin_revision_layer.py +++ b/swh/provenance/tests/test_origin_revision_layer.py @@ -1,196 +1,196 @@ # Copyright (C) 2021-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import re from typing import Any, Dict, Iterable, Iterator, List, Set import pytest from typing_extensions import TypedDict from swh.model.hashutil import hash_to_bytes from swh.model.model import Sha1Git +from swh.provenance.algos.origin import origin_add from swh.provenance.archive import ArchiveInterface from swh.provenance.interface import ProvenanceInterface from swh.provenance.model import OriginEntry -from swh.provenance.origin import origin_add from swh.provenance.storage.interface import EntityType, RelationType from swh.provenance.tests.conftest import fill_storage, get_datafile, load_repo_data class SynthRelation(TypedDict): src: Sha1Git dst: Sha1Git name: str class SynthOrigin(TypedDict): sha1: Sha1Git url: str snap: Sha1Git O_R: List[SynthRelation] R_R: List[SynthRelation] def synthetic_origin_revision_result(filename: str) -> Iterator[SynthOrigin]: """Generates dict representations of synthetic origin visits found in the synthetic file (from the data/ directory) given as argument of the generator. Generated SynthOrigin (typed dict) with the following elements: "sha1": (Sha1Git) sha1 of the origin, "url": (str) url of the origin, "snap": (Sha1Git) sha1 of the visit's snapshot, "O_R": (list) new O-R relations added by this origin visit "R_R": (list) new R-R relations added by this origin visit Each relation above is a SynthRelation typed dict with: "src": (Sha1Git) sha1 of the source of the relation "dst": (Sha1Git) sha1 of the destination of the relation """ with open(get_datafile(filename), "r") as fobj: yield from _parse_synthetic_origin_revision_file(fobj) def _parse_synthetic_origin_revision_file(fobj: Iterable[str]) -> Iterator[SynthOrigin]: """Read a 'synthetic' file and generate a dict representation of the synthetic origin visit for each snapshot listed in the synthetic file. """ regs = [ "(?P[^ ]+)?", "(?P[^| ]*)", "(?PR[0-9]{2,4})?", "(?P[ORS]) (?P[0-9a-f]{40})", ] regex = re.compile("^ *" + r" *[|] *".join(regs) + r" *(#.*)?$") current_org: List[dict] = [] for m in (regex.match(line) for line in fobj): if m: d = m.groupdict() if d["url"]: if current_org: yield _mk_synth_org(current_org) current_org.clear() current_org.append(d) if current_org: yield _mk_synth_org(current_org) def _mk_synth_org(synth_org: List[Dict[str, str]]) -> SynthOrigin: assert synth_org[0]["type"] == "O" assert synth_org[1]["type"] == "S" org = SynthOrigin( sha1=hash_to_bytes(synth_org[0]["sha1"]), url=synth_org[0]["url"], snap=hash_to_bytes(synth_org[1]["sha1"]), O_R=[], R_R=[], ) for row in synth_org[2:]: if row["reltype"] == "O-R": assert row["type"] == "R" org["O_R"].append( SynthRelation( src=org["sha1"], dst=hash_to_bytes(row["sha1"]), name=row["revname"], ) ) elif row["reltype"] == "R-R": assert row["type"] == "R" org["R_R"].append( SynthRelation( src=org["O_R"][-1]["dst"], dst=hash_to_bytes(row["sha1"]), name=row["revname"], ) ) return org @pytest.mark.origin_layer @pytest.mark.parametrize( "repo, visit", (("with-merges", "visits-01"),), ) def test_origin_revision_layer( provenance: ProvenanceInterface, archive: ArchiveInterface, repo: str, visit: str, ) -> None: # read data/README.md for more details on how these datasets are generated data = load_repo_data(repo) fill_storage(archive.storage, data) syntheticfile = get_datafile(f"origin-revision_{repo}_{visit}.txt") origins = [ {"url": status["origin"], "snap": status["snapshot"]} for status in data["origin_visit_status"] if status["snapshot"] is not None ] rows: Dict[str, Set[Any]] = { "origin": set(), "revision_in_origin": set(), "revision_before_revision": set(), "revision": set(), } for synth_org in synthetic_origin_revision_result(syntheticfile): for origin in ( org for org in origins if org["url"] == synth_org["url"] and org["snap"] == synth_org["snap"] ): entry = OriginEntry(url=origin["url"], snapshot=origin["snap"]) origin_add(provenance, archive, [entry]) # each "entry" in the synth file is one new origin visit rows["origin"].add(synth_org["sha1"]) assert rows["origin"] == provenance.storage.entity_get_all( EntityType.ORIGIN ), synth_org["url"] # check the url of the origin assert ( provenance.storage.origin_get([synth_org["sha1"]])[synth_org["sha1"]] == synth_org["url"] ), synth_org["snap"] # this origin visit might have added new revision objects rows["revision"] |= set(x["dst"] for x in synth_org["O_R"]) rows["revision"] |= set(x["dst"] for x in synth_org["R_R"]) assert rows["revision"] == provenance.storage.entity_get_all( EntityType.REVISION ), synth_org["snap"] # check for O-R (head) entries # these are added in the revision_in_origin relation rows["revision_in_origin"] |= set( (x["dst"], x["src"], None) for x in synth_org["O_R"] ) assert rows["revision_in_origin"] == { (src, rel.dst, rel.path) for src, rels in provenance.storage.relation_get_all( RelationType.REV_IN_ORG ).items() for rel in rels }, synth_org["snap"] # check for R-R entries # these are added in the revision_before_revision relation rows["revision_before_revision"] |= set( (x["dst"], x["src"], None) for x in synth_org["R_R"] ) assert rows["revision_before_revision"] == { (src, rel.dst, rel.path) for src, rels in provenance.storage.relation_get_all( RelationType.REV_BEFORE_REV ).items() for rel in rels }, synth_org["snap"] diff --git a/swh/provenance/tests/test_provenance_storage.py b/swh/provenance/tests/test_provenance_storage.py index 0dcb7d5..60c5126 100644 --- a/swh/provenance/tests/test_provenance_storage.py +++ b/swh/provenance/tests/test_provenance_storage.py @@ -1,521 +1,521 @@ # Copyright (C) 2021-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime, timezone import inspect import os from typing import Any, Dict, Iterable, Optional, Set, Tuple import pytest from swh.model.hashutil import hash_to_bytes from swh.model.model import Origin, Sha1Git +from swh.provenance.algos.origin import origin_add +from swh.provenance.algos.revision import revision_add from swh.provenance.archive import ArchiveInterface from swh.provenance.interface import ProvenanceInterface from swh.provenance.model import OriginEntry, RevisionEntry -from swh.provenance.origin import origin_add from swh.provenance.provenance import Provenance -from swh.provenance.revision import revision_add from swh.provenance.storage.interface import ( DirectoryData, EntityType, ProvenanceResult, ProvenanceStorageInterface, RelationData, RelationType, RevisionData, ) from swh.provenance.tests.conftest import fill_storage, load_repo_data, ts2dt class TestProvenanceStorage: def test_provenance_storage_content( self, provenance_storage: ProvenanceStorageInterface, ) -> None: """Tests content methods for every `ProvenanceStorageInterface` implementation.""" # Read data/README.md for more details on how these datasets are generated. data = load_repo_data("cmdbts2") # Add all content present in the current repo to the storage, just assigning their # creation dates. Then check that the returned results when querying are the same. cnt_dates = { cnt["sha1_git"]: cnt["ctime"] for idx, cnt in enumerate(data["content"]) } assert provenance_storage.content_add(cnt_dates) assert provenance_storage.content_get(set(cnt_dates.keys())) == cnt_dates assert provenance_storage.entity_get_all(EntityType.CONTENT) == set( cnt_dates.keys() ) def test_provenance_storage_directory( self, provenance_storage: ProvenanceStorageInterface, ) -> None: """Tests directory methods for every `ProvenanceStorageInterface` implementation.""" # Read data/README.md for more details on how these datasets are generated. data = load_repo_data("cmdbts2") # Of all directories present in the current repo, only assign a date to those # containing blobs (picking the max date among the available ones). Then check that # the returned results when querying are the same. def getmaxdate( directory: Dict[str, Any], contents: Iterable[Dict[str, Any]] ) -> Optional[datetime]: dates = [ content["ctime"] for entry in directory["entries"] for content in contents if entry["type"] == "file" and entry["target"] == content["sha1_git"] ] return max(dates) if dates else None flat_values = (False, True) dir_dates = {} for idx, dir in enumerate(data["directory"]): date = getmaxdate(dir, data["content"]) if date is not None: dir_dates[dir["id"]] = DirectoryData( date=date, flat=flat_values[idx % 2] ) assert provenance_storage.directory_add(dir_dates) assert provenance_storage.directory_get(set(dir_dates.keys())) == dir_dates assert provenance_storage.entity_get_all(EntityType.DIRECTORY) == set( dir_dates.keys() ) def test_provenance_storage_location( self, provenance_storage: ProvenanceStorageInterface, ) -> None: """Tests location methods for every `ProvenanceStorageInterface` implementation.""" # Read data/README.md for more details on how these datasets are generated. data = load_repo_data("cmdbts2") # Add all names of entries present in the directories of the current repo as paths # to the storage. Then check that the returned results when querying are the same. paths = {entry["name"] for dir in data["directory"] for entry in dir["entries"]} assert provenance_storage.location_add(paths) if provenance_storage.with_path(): assert provenance_storage.location_get_all() == paths else: assert provenance_storage.location_get_all() == set() @pytest.mark.origin_layer def test_provenance_storage_origin( self, provenance_storage: ProvenanceStorageInterface, ) -> None: """Tests origin methods for every `ProvenanceStorageInterface` implementation.""" # Read data/README.md for more details on how these datasets are generated. data = load_repo_data("cmdbts2") # Test origin methods. # Add all origins present in the current repo to the storage. Then check that the # returned results when querying are the same. orgs = {Origin(url=org["url"]).id: org["url"] for org in data["origin"]} assert orgs assert provenance_storage.origin_add(orgs) assert provenance_storage.origin_get(set(orgs.keys())) == orgs assert provenance_storage.entity_get_all(EntityType.ORIGIN) == set(orgs.keys()) def test_provenance_storage_revision( self, provenance_storage: ProvenanceStorageInterface, ) -> None: """Tests revision methods for every `ProvenanceStorageInterface` implementation.""" # Read data/README.md for more details on how these datasets are generated. data = load_repo_data("cmdbts2") # Test revision methods. # Add all revisions present in the current repo to the storage, assigning their # dates and an arbitrary origin to each one. Then check that the returned results # when querying are the same. origin = Origin(url=next(iter(data["origin"]))["url"]) # Origin must be inserted in advance. assert provenance_storage.origin_add({origin.id: origin.url}) revs = {rev["id"] for idx, rev in enumerate(data["revision"]) if idx % 6 == 0} rev_data = { rev["id"]: RevisionData( date=ts2dt(rev["date"]) if idx % 2 != 0 else None, origin=origin.id if idx % 3 != 0 else None, ) for idx, rev in enumerate(data["revision"]) if idx % 6 != 0 } assert revs assert provenance_storage.revision_add(revs) assert provenance_storage.revision_add(rev_data) assert provenance_storage.revision_get(set(rev_data.keys())) == rev_data assert provenance_storage.entity_get_all(EntityType.REVISION) == revs | set( rev_data.keys() ) def test_provenance_storage_relation_revision_layer( self, provenance_storage: ProvenanceStorageInterface, ) -> None: """Tests relation methods for every `ProvenanceStorageInterface` implementation.""" # Read data/README.md for more details on how these datasets are generated. data = load_repo_data("cmdbts2") # Test content-in-revision relation. # Create flat models of every root directory for the revisions in the dataset. cnt_in_rev: Dict[Sha1Git, Set[RelationData]] = {} for rev in data["revision"]: root = next( subdir for subdir in data["directory"] if subdir["id"] == rev["directory"] ) for cnt, rel in dircontent(data, rev["id"], root): cnt_in_rev.setdefault(cnt, set()).add(rel) relation_add_and_compare_result( provenance_storage, RelationType.CNT_EARLY_IN_REV, cnt_in_rev ) # Test content-in-directory relation. # Create flat models for every directory in the dataset. cnt_in_dir: Dict[Sha1Git, Set[RelationData]] = {} for dir in data["directory"]: for cnt, rel in dircontent(data, dir["id"], dir): cnt_in_dir.setdefault(cnt, set()).add(rel) relation_add_and_compare_result( provenance_storage, RelationType.CNT_IN_DIR, cnt_in_dir ) # Test content-in-directory relation. # Add root directories to their correspondent revision in the dataset. dir_in_rev: Dict[Sha1Git, Set[RelationData]] = {} for rev in data["revision"]: dir_in_rev.setdefault(rev["directory"], set()).add( RelationData(dst=rev["id"], path=b".") ) relation_add_and_compare_result( provenance_storage, RelationType.DIR_IN_REV, dir_in_rev ) @pytest.mark.origin_layer def test_provenance_storage_relation_orign_layer( self, provenance_storage: ProvenanceStorageInterface, ) -> None: """Tests relation methods for every `ProvenanceStorageInterface` implementation.""" # Read data/README.md for more details on how these datasets are generated. data = load_repo_data("cmdbts2") # Test revision-in-origin relation. # Origins must be inserted in advance (cannot be done by `entity_add` inside # `relation_add_and_compare_result`). orgs = {Origin(url=org["url"]).id: org["url"] for org in data["origin"]} assert provenance_storage.origin_add(orgs) # Add all revisions that are head of some snapshot branch to the corresponding # origin. rev_in_org: Dict[Sha1Git, Set[RelationData]] = {} for status in data["origin_visit_status"]: if status["snapshot"] is not None: for snapshot in data["snapshot"]: if snapshot["id"] == status["snapshot"]: for branch in snapshot["branches"].values(): if branch["target_type"] == "revision": rev_in_org.setdefault(branch["target"], set()).add( RelationData( dst=Origin(url=status["origin"]).id, path=None, ) ) relation_add_and_compare_result( provenance_storage, RelationType.REV_IN_ORG, rev_in_org ) # Test revision-before-revision relation. # For each revision in the data set add an entry for each parent to the relation. rev_before_rev: Dict[Sha1Git, Set[RelationData]] = {} for rev in data["revision"]: for parent in rev["parents"]: rev_before_rev.setdefault(parent, set()).add( RelationData(dst=rev["id"], path=None) ) relation_add_and_compare_result( provenance_storage, RelationType.REV_BEFORE_REV, rev_before_rev ) def test_provenance_storage_find_revision_layer( self, provenance: ProvenanceInterface, provenance_storage: ProvenanceStorageInterface, archive: ArchiveInterface, ) -> None: """Tests `content_find_first` and `content_find_all` methods for every `ProvenanceStorageInterface` implementation. """ # Read data/README.md for more details on how these datasets are generated. data = load_repo_data("cmdbts2") fill_storage(archive.storage, data) # Test content_find_first and content_find_all, first only executing the # revision-content algorithm, then adding the origin-revision layer. def adapt_result( result: Optional[ProvenanceResult], with_path: bool ) -> Optional[ProvenanceResult]: if result is not None: return ProvenanceResult( result.content, result.revision, result.date, result.origin, result.path if with_path else b"", ) return result # Execute the revision-content algorithm on both storages. revisions = [ RevisionEntry(id=rev["id"], date=ts2dt(rev["date"]), root=rev["directory"]) for rev in data["revision"] ] revision_add(provenance, archive, revisions) revision_add(Provenance(provenance_storage), archive, revisions) assert adapt_result( ProvenanceResult( content=hash_to_bytes("20329687bb9c1231a7e05afe86160343ad49b494"), revision=hash_to_bytes("c0d8929936631ecbcf9147be6b8aa13b13b014e4"), date=datetime.fromtimestamp(1000000000.0, timezone.utc), origin=None, path=b"A/B/C/a", ), provenance_storage.with_path(), ) == provenance_storage.content_find_first( hash_to_bytes("20329687bb9c1231a7e05afe86160343ad49b494") ) for cnt in {cnt["sha1_git"] for cnt in data["content"]}: assert adapt_result( provenance.storage.content_find_first(cnt), provenance_storage.with_path(), ) == provenance_storage.content_find_first(cnt) assert { adapt_result(occur, provenance_storage.with_path()) for occur in provenance.storage.content_find_all(cnt) } == set(provenance_storage.content_find_all(cnt)) @pytest.mark.origin_layer def test_provenance_storage_find_origin_layer( self, provenance: ProvenanceInterface, provenance_storage: ProvenanceStorageInterface, archive: ArchiveInterface, ) -> None: """Tests `content_find_first` and `content_find_all` methods for every `ProvenanceStorageInterface` implementation. """ # Read data/README.md for more details on how these datasets are generated. data = load_repo_data("cmdbts2") fill_storage(archive.storage, data) # Execute the revision-content algorithm on both storages. revisions = [ RevisionEntry(id=rev["id"], date=ts2dt(rev["date"]), root=rev["directory"]) for rev in data["revision"] ] revision_add(provenance, archive, revisions) revision_add(Provenance(provenance_storage), archive, revisions) # Test content_find_first and content_find_all, first only executing the # revision-content algorithm, then adding the origin-revision layer. def adapt_result( result: Optional[ProvenanceResult], with_path: bool ) -> Optional[ProvenanceResult]: if result is not None: return ProvenanceResult( result.content, result.revision, result.date, result.origin, result.path if with_path else b"", ) return result # Execute the origin-revision algorithm on both storages. origins = [ OriginEntry(url=sta["origin"], snapshot=sta["snapshot"]) for sta in data["origin_visit_status"] if sta["snapshot"] is not None ] origin_add(provenance, archive, origins) origin_add(Provenance(provenance_storage), archive, origins) assert adapt_result( ProvenanceResult( content=hash_to_bytes("20329687bb9c1231a7e05afe86160343ad49b494"), revision=hash_to_bytes("c0d8929936631ecbcf9147be6b8aa13b13b014e4"), date=datetime.fromtimestamp(1000000000.0, timezone.utc), origin="https://cmdbts2", path=b"A/B/C/a", ), provenance_storage.with_path(), ) == provenance_storage.content_find_first( hash_to_bytes("20329687bb9c1231a7e05afe86160343ad49b494") ) for cnt in {cnt["sha1_git"] for cnt in data["content"]}: assert adapt_result( provenance.storage.content_find_first(cnt), provenance_storage.with_path(), ) == provenance_storage.content_find_first(cnt) assert { adapt_result(occur, provenance_storage.with_path()) for occur in provenance.storage.content_find_all(cnt) } == set(provenance_storage.content_find_all(cnt)) def test_types(self, provenance_storage: ProvenanceStorageInterface) -> None: """Checks all methods of ProvenanceStorageInterface are implemented by this backend, and that they have the same signature.""" # Create an instance of the protocol (which cannot be instantiated # directly, so this creates a subclass, then instantiates it) interface = type("_", (ProvenanceStorageInterface,), {})() assert "content_find_first" in dir(interface) missing_methods = [] for meth_name in dir(interface): if meth_name.startswith("_"): continue interface_meth = getattr(interface, meth_name) try: concrete_meth = getattr(provenance_storage, meth_name) except AttributeError: if not getattr(interface_meth, "deprecated_endpoint", False): # The backend is missing a (non-deprecated) endpoint missing_methods.append(meth_name) continue expected_signature = inspect.signature(interface_meth) actual_signature = inspect.signature(concrete_meth) assert expected_signature == actual_signature, meth_name assert missing_methods == [] # If all the assertions above succeed, then this one should too. # But there's no harm in double-checking. # And we could replace the assertions above by this one, but unlike # the assertions above, it doesn't explain what is missing. assert isinstance(provenance_storage, ProvenanceStorageInterface) def dircontent( data: Dict[str, Any], ref: Sha1Git, dir: Dict[str, Any], prefix: bytes = b"", ) -> Iterable[Tuple[Sha1Git, RelationData]]: content = { ( entry["target"], RelationData(dst=ref, path=os.path.join(prefix, entry["name"])), ) for entry in dir["entries"] if entry["type"] == "file" } for entry in dir["entries"]: if entry["type"] == "dir": child = next( subdir for subdir in data["directory"] if subdir["id"] == entry["target"] ) content.update( dircontent(data, ref, child, os.path.join(prefix, entry["name"])) ) return content def entity_add( storage: ProvenanceStorageInterface, entity: EntityType, ids: Set[Sha1Git] ) -> bool: now = datetime.now(tz=timezone.utc) if entity == EntityType.CONTENT: return storage.content_add({sha1: now for sha1 in ids}) elif entity == EntityType.DIRECTORY: return storage.directory_add( {sha1: DirectoryData(date=now, flat=False) for sha1 in ids} ) else: # entity == EntityType.REVISION: return storage.revision_add( {sha1: RevisionData(date=None, origin=None) for sha1 in ids} ) def relation_add_and_compare_result( storage: ProvenanceStorageInterface, relation: RelationType, data: Dict[Sha1Git, Set[RelationData]], ) -> None: # Source, destinations and locations must be added in advance. src, *_, dst = relation.value.split("_") srcs = {sha1 for sha1 in data} if src != "origin": assert entity_add(storage, EntityType(src), srcs) dsts = {rel.dst for rels in data.values() for rel in rels} if dst != "origin": assert entity_add(storage, EntityType(dst), dsts) if storage.with_path(): assert storage.location_add( {rel.path for rels in data.values() for rel in rels if rel.path is not None} ) assert data assert storage.relation_add(relation, data) for src_sha1 in srcs: relation_compare_result( storage.relation_get(relation, [src_sha1]), {src_sha1: data[src_sha1]}, storage.with_path(), ) for dst_sha1 in dsts: relation_compare_result( storage.relation_get(relation, [dst_sha1], reverse=True), { src_sha1: { RelationData(dst=dst_sha1, path=rel.path) for rel in rels if dst_sha1 == rel.dst } for src_sha1, rels in data.items() if dst_sha1 in {rel.dst for rel in rels} }, storage.with_path(), ) relation_compare_result( storage.relation_get_all(relation), data, storage.with_path() ) def relation_compare_result( computed: Dict[Sha1Git, Set[RelationData]], expected: Dict[Sha1Git, Set[RelationData]], with_path: bool, ) -> None: assert { src_sha1: { RelationData(dst=rel.dst, path=rel.path if with_path else None) for rel in rels } for src_sha1, rels in expected.items() } == computed diff --git a/swh/provenance/tests/test_revision_content_layer.py b/swh/provenance/tests/test_revision_content_layer.py index 2f82eb3..abdf3b5 100644 --- a/swh/provenance/tests/test_revision_content_layer.py +++ b/swh/provenance/tests/test_revision_content_layer.py @@ -1,482 +1,482 @@ # Copyright (C) 2021-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import re from typing import Any, Dict, Iterable, Iterator, List, Optional, Set, Tuple import pytest from typing_extensions import TypedDict from swh.model.hashutil import hash_to_bytes from swh.model.model import Sha1Git +from swh.provenance.algos.directory import directory_add +from swh.provenance.algos.revision import revision_add from swh.provenance.archive import ArchiveInterface -from swh.provenance.directory import directory_add from swh.provenance.interface import ProvenanceInterface from swh.provenance.model import DirectoryEntry, RevisionEntry -from swh.provenance.revision import revision_add from swh.provenance.storage.interface import EntityType, RelationType from swh.provenance.tests.conftest import ( fill_storage, get_datafile, load_repo_data, ts2dt, ) class SynthRelation(TypedDict): prefix: Optional[str] path: str src: Sha1Git dst: Sha1Git rel_ts: float class SynthRevision(TypedDict): sha1: Sha1Git date: float msg: str R_C: List[SynthRelation] R_D: List[SynthRelation] D_C: List[SynthRelation] def synthetic_revision_content_result(filename: str) -> Iterator[SynthRevision]: """Generates dict representations of synthetic revisions found in the synthetic file (from the data/ directory) given as argument of the generator. Generated SynthRevision (typed dict) with the following elements: "sha1": (Sha1Git) sha1 of the revision, "date": (float) timestamp of the revision, "msg": (str) commit message of the revision, "R_C": (list) new R---C relations added by this revision "R_D": (list) new R-D relations added by this revision "D_C": (list) new D-C relations added by this revision Each relation above is a SynthRelation typed dict with: "path": (str) location "src": (Sha1Git) sha1 of the source of the relation "dst": (Sha1Git) sha1 of the destination of the relation "rel_ts": (float) timestamp of the target of the relation (related to the timestamp of the revision) """ with open(get_datafile(filename), "r") as fobj: yield from _parse_synthetic_revision_content_file(fobj) def _parse_synthetic_revision_content_file( fobj: Iterable[str], ) -> Iterator[SynthRevision]: """Read a 'synthetic' file and generate a dict representation of the synthetic revision for each revision listed in the synthetic file. """ regs = [ "(?PR[0-9]{2,4})?", "(?P[^| ]*)", "([+] )?(?P[^| +]*?)[/]?", "(?P[RDC]) (?P[0-9a-f]{40})", "(?P-?[0-9]+(.[0-9]+)?)", ] regex = re.compile("^ *" + r" *[|] *".join(regs) + r" *(#.*)?$") current_rev: List[dict] = [] for m in (regex.match(line) for line in fobj): if m: d = m.groupdict() if d["revname"]: if current_rev: yield _mk_synth_rev(current_rev) current_rev.clear() current_rev.append(d) if current_rev: yield _mk_synth_rev(current_rev) def _mk_synth_rev(synth_rev: List[Dict[str, str]]) -> SynthRevision: assert synth_rev[0]["type"] == "R" rev = SynthRevision( sha1=hash_to_bytes(synth_rev[0]["sha1"]), date=float(synth_rev[0]["ts"]), msg=synth_rev[0]["revname"], R_C=[], R_D=[], D_C=[], ) current_path = None # path of the last R-D relation we parsed, used a prefix for next D-C # relations for row in synth_rev[1:]: if row["reltype"] == "R---C": assert row["type"] == "C" rev["R_C"].append( SynthRelation( prefix=None, path=row["path"], src=rev["sha1"], dst=hash_to_bytes(row["sha1"]), rel_ts=float(row["ts"]), ) ) current_path = None elif row["reltype"] == "R-D": assert row["type"] == "D" rev["R_D"].append( SynthRelation( prefix=None, path=row["path"], src=rev["sha1"], dst=hash_to_bytes(row["sha1"]), rel_ts=float(row["ts"]), ) ) current_path = row["path"] elif row["reltype"] == "D-C": assert row["type"] == "C" rev["D_C"].append( SynthRelation( prefix=current_path, path=row["path"], src=rev["R_D"][-1]["dst"], dst=hash_to_bytes(row["sha1"]), rel_ts=float(row["ts"]), ) ) return rev @pytest.mark.parametrize( "repo, lower, mindepth, flatten", ( ("cmdbts2", True, 1, True), ("cmdbts2", True, 1, False), ("cmdbts2", False, 1, True), ("cmdbts2", False, 1, False), ("cmdbts2", True, 2, True), ("cmdbts2", True, 2, False), ("cmdbts2", False, 2, True), ("cmdbts2", False, 2, False), ("out-of-order", True, 1, True), ("out-of-order", True, 1, False), ), ) def test_revision_content_result( provenance: ProvenanceInterface, archive: ArchiveInterface, repo: str, lower: bool, mindepth: int, flatten: bool, ) -> None: # read data/README.md for more details on how these datasets are generated data = load_repo_data(repo) fill_storage(archive.storage, data) syntheticfile = get_datafile( f"synthetic_{repo}_{'lower' if lower else 'upper'}_{mindepth}.txt" ) revisions = {rev["id"]: rev for rev in data["revision"]} rows: Dict[str, Set[Any]] = { "content": set(), "content_in_directory": set(), "content_in_revision": set(), "directory": set(), "directory_in_revision": set(), "location": set(), "revision": set(), } def maybe_path(path: str) -> Optional[bytes]: if provenance.storage.with_path(): return path.encode("utf-8") return None for synth_rev in synthetic_revision_content_result(syntheticfile): revision = revisions[synth_rev["sha1"]] entry = RevisionEntry( id=revision["id"], date=ts2dt(revision["date"]), root=revision["directory"], ) if flatten: revision_add(provenance, archive, [entry], lower=lower, mindepth=mindepth) else: prev_directories = provenance.storage.entity_get_all(EntityType.DIRECTORY) revision_add( provenance, archive, [entry], lower=lower, mindepth=mindepth, flatten=False, ) directories = [ DirectoryEntry(id=sha1) for sha1 in provenance.storage.entity_get_all( EntityType.DIRECTORY ).difference(prev_directories) ] for directory in directories: assert not provenance.directory_already_flattenned(directory) directory_add(provenance, archive, directories) # each "entry" in the synth file is one new revision rows["revision"].add(synth_rev["sha1"]) assert rows["revision"] == provenance.storage.entity_get_all( EntityType.REVISION ), synth_rev["msg"] # check the timestamp of the revision rev_ts = synth_rev["date"] rev_data = provenance.storage.revision_get([synth_rev["sha1"]])[ synth_rev["sha1"] ] assert ( rev_data.date is not None and rev_ts == rev_data.date.timestamp() ), synth_rev["msg"] # this revision might have added new content objects rows["content"] |= set(x["dst"] for x in synth_rev["R_C"]) rows["content"] |= set(x["dst"] for x in synth_rev["D_C"]) assert rows["content"] == provenance.storage.entity_get_all( EntityType.CONTENT ), synth_rev["msg"] # check for R-C (direct) entries # these are added directly in the content_early_in_rev table rows["content_in_revision"] |= set( (x["dst"], x["src"], maybe_path(x["path"])) for x in synth_rev["R_C"] ) assert rows["content_in_revision"] == { (src, rel.dst, rel.path) for src, rels in provenance.storage.relation_get_all( RelationType.CNT_EARLY_IN_REV ).items() for rel in rels }, synth_rev["msg"] # check timestamps for rc in synth_rev["R_C"]: assert ( rev_ts + rc["rel_ts"] == provenance.storage.content_get([rc["dst"]])[rc["dst"]].timestamp() ), synth_rev["msg"] # check directories # each directory stored in the provenance index is an entry # in the "directory" table... rows["directory"] |= set(x["dst"] for x in synth_rev["R_D"]) assert rows["directory"] == provenance.storage.entity_get_all( EntityType.DIRECTORY ), synth_rev["msg"] # ... + a number of rows in the "directory_in_rev" table... # check for R-D entries rows["directory_in_revision"] |= set( (x["dst"], x["src"], maybe_path(x["path"])) for x in synth_rev["R_D"] ) assert rows["directory_in_revision"] == { (src, rel.dst, rel.path) for src, rels in provenance.storage.relation_get_all( RelationType.DIR_IN_REV ).items() for rel in rels }, synth_rev["msg"] # check timestamps for rd in synth_rev["R_D"]: dir_data = provenance.storage.directory_get([rd["dst"]])[rd["dst"]] assert rev_ts + rd["rel_ts"] == dir_data.date.timestamp(), synth_rev["msg"] assert dir_data.flat, synth_rev["msg"] # ... + a number of rows in the "content_in_dir" table # for content of the directory. # check for D-C entries rows["content_in_directory"] |= set( (x["dst"], x["src"], maybe_path(x["path"])) for x in synth_rev["D_C"] ) assert rows["content_in_directory"] == { (src, rel.dst, rel.path) for src, rels in provenance.storage.relation_get_all( RelationType.CNT_IN_DIR ).items() for rel in rels }, synth_rev["msg"] # check timestamps for dc in synth_rev["D_C"]: assert ( rev_ts + dc["rel_ts"] == provenance.storage.content_get([dc["dst"]])[dc["dst"]].timestamp() ), synth_rev["msg"] if provenance.storage.with_path(): # check for location entries rows["location"] |= set(x["path"].encode() for x in synth_rev["R_C"]) rows["location"] |= set(x["path"].encode() for x in synth_rev["D_C"]) rows["location"] |= set(x["path"].encode() for x in synth_rev["R_D"]) assert rows["location"] == provenance.storage.location_get_all(), synth_rev[ "msg" ] @pytest.mark.parametrize( "repo, lower, mindepth", ( ("cmdbts2", True, 1), ("cmdbts2", False, 1), ("cmdbts2", True, 2), ("cmdbts2", False, 2), ("out-of-order", True, 1), ), ) @pytest.mark.parametrize("batch", (True, False)) def test_provenance_heuristics_content_find_all( provenance: ProvenanceInterface, archive: ArchiveInterface, repo: str, lower: bool, mindepth: int, batch: bool, ) -> None: # read data/README.md for more details on how these datasets are generated data = load_repo_data(repo) fill_storage(archive.storage, data) revisions = [ RevisionEntry( id=revision["id"], date=ts2dt(revision["date"]), root=revision["directory"], ) for revision in data["revision"] ] def maybe_path(path: str) -> str: if provenance.storage.with_path(): return path return "" if batch: revision_add(provenance, archive, revisions, lower=lower, mindepth=mindepth) else: for revision in revisions: revision_add( provenance, archive, [revision], lower=lower, mindepth=mindepth ) syntheticfile = get_datafile( f"synthetic_{repo}_{'lower' if lower else 'upper'}_{mindepth}.txt" ) expected_occurrences: Dict[str, List[Tuple[str, float, Optional[str], str]]] = {} for synth_rev in synthetic_revision_content_result(syntheticfile): rev_id = synth_rev["sha1"].hex() rev_ts = synth_rev["date"] for rc in synth_rev["R_C"]: expected_occurrences.setdefault(rc["dst"].hex(), []).append( (rev_id, rev_ts, None, maybe_path(rc["path"])) ) for dc in synth_rev["D_C"]: assert dc["prefix"] is not None # to please mypy expected_occurrences.setdefault(dc["dst"].hex(), []).append( (rev_id, rev_ts, None, maybe_path(dc["prefix"] + "/" + dc["path"])) ) for content_id, results in expected_occurrences.items(): expected = [(content_id, *result) for result in results] db_occurrences = [ ( occur.content.hex(), occur.revision.hex(), occur.date.timestamp(), occur.origin, occur.path.decode(), ) for occur in provenance.content_find_all(hash_to_bytes(content_id)) ] if provenance.storage.with_path(): # this is not true if the db stores no path, because a same content # that appears several times in a given revision may be reported # only once by content_find_all() assert len(db_occurrences) == len(expected) assert set(db_occurrences) == set(expected) @pytest.mark.parametrize( "repo, lower, mindepth", ( ("cmdbts2", True, 1), ("cmdbts2", False, 1), ("cmdbts2", True, 2), ("cmdbts2", False, 2), ("out-of-order", True, 1), ), ) @pytest.mark.parametrize("batch", (True, False)) def test_provenance_heuristics_content_find_first( provenance: ProvenanceInterface, archive: ArchiveInterface, repo: str, lower: bool, mindepth: int, batch: bool, ) -> None: # read data/README.md for more details on how these datasets are generated data = load_repo_data(repo) fill_storage(archive.storage, data) revisions = [ RevisionEntry( id=revision["id"], date=ts2dt(revision["date"]), root=revision["directory"], ) for revision in data["revision"] ] if batch: revision_add(provenance, archive, revisions, lower=lower, mindepth=mindepth) else: for revision in revisions: revision_add( provenance, archive, [revision], lower=lower, mindepth=mindepth ) syntheticfile = get_datafile( f"synthetic_{repo}_{'lower' if lower else 'upper'}_{mindepth}.txt" ) expected_first: Dict[str, Tuple[str, float, List[str]]] = {} # dict of tuples (blob_id, rev_id, [path, ...]) the third element for path # is a list because a content can be added at several places in a single # revision, in which case the result of content_find_first() is one of # those path, but we have no guarantee which one it will return. for synth_rev in synthetic_revision_content_result(syntheticfile): rev_id = synth_rev["sha1"].hex() rev_ts = synth_rev["date"] for rc in synth_rev["R_C"]: sha1 = rc["dst"].hex() if sha1 not in expected_first: assert rc["rel_ts"] == 0 expected_first[sha1] = (rev_id, rev_ts, [rc["path"]]) else: if rev_ts == expected_first[sha1][1]: expected_first[sha1][2].append(rc["path"]) elif rev_ts < expected_first[sha1][1]: expected_first[sha1] = (rev_id, rev_ts, [rc["path"]]) for dc in synth_rev["D_C"]: sha1 = rc["dst"].hex() assert sha1 in expected_first # nothing to do there, this content cannot be a "first seen file" for content_id, (rev_id, ts, paths) in expected_first.items(): occur = provenance.content_find_first(hash_to_bytes(content_id)) assert occur is not None assert occur.content.hex() == content_id assert occur.revision.hex() == rev_id assert occur.date.timestamp() == ts assert occur.origin is None if provenance.storage.with_path(): assert occur.path.decode() in paths diff --git a/swh/provenance/tests/test_revision_iterator.py b/swh/provenance/tests/test_revision_iterator.py index d4b602d..9151fe4 100644 --- a/swh/provenance/tests/test_revision_iterator.py +++ b/swh/provenance/tests/test_revision_iterator.py @@ -1,31 +1,31 @@ # Copyright (C) 2021-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import pytest -from swh.provenance.revision import CSVRevisionIterator +from swh.provenance.algos.revision import CSVRevisionIterator from swh.provenance.tests.conftest import fill_storage, load_repo_data, ts2dt from swh.storage.interface import StorageInterface @pytest.mark.parametrize( "repo", ( "cmdbts2", "out-of-order", ), ) def test_revision_iterator(swh_storage: StorageInterface, repo: str) -> None: """Test CSVRevisionIterator""" data = load_repo_data(repo) fill_storage(swh_storage, data) revisions_csv = [ (rev["id"], ts2dt(rev["date"]), rev["directory"]) for rev in data["revision"] ] revisions = list(CSVRevisionIterator(revisions_csv)) assert revisions assert len(revisions) == len(data["revision"])