diff --git a/swh/provenance/cli.py b/swh/provenance/cli.py index 989d1d8..704260b 100644 --- a/swh/provenance/cli.py +++ b/swh/provenance/cli.py @@ -1,198 +1,224 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information # WARNING: do not import unnecessary things here to keep cli startup time under # control +from datetime import datetime, timezone import os -from typing import Any, Dict, Optional +from typing import Any, Dict, Generator, Optional, Tuple import click +import iso8601 import yaml from swh.core import config from swh.core.cli import CONTEXT_SETTINGS from swh.core.cli import swh as swh_cli_group from swh.model.hashutil import hash_to_bytes, hash_to_hex +from swh.model.model import Sha1Git # All generic config code should reside in swh.core.config CONFIG_ENVVAR = "SWH_CONFIG_FILE" DEFAULT_CONFIG_PATH = os.path.join(click.get_app_dir("swh"), "global.yml") DEFAULT_PATH = os.environ.get(CONFIG_ENVVAR, DEFAULT_CONFIG_PATH) DEFAULT_CONFIG: Dict[str, Any] = { "archive": { "cls": "api", "storage": { "cls": "remote", "url": "http://uffizi.internal.softwareheritage.org:5002", } # "cls": "direct", # "db": { # "host": "db.internal.softwareheritage.org", # "dbname": "softwareheritage", # "user": "guest" # } }, "provenance": {"cls": "local", "db": {"host": "localhost", "dbname": "provenance"}}, } CONFIG_FILE_HELP = f"""Configuration file: \b The CLI option or the environment variable will fail if invalid. CLI option is checked first. Then, environment variable {CONFIG_ENVVAR} is checked. Then, if cannot load the default path, a set of default values are used. Default config path is {DEFAULT_CONFIG_PATH}. Default config values are: \b {yaml.dump(DEFAULT_CONFIG)}""" PROVENANCE_HELP = f"""Software Heritage Scanner tools. {CONFIG_FILE_HELP}""" @swh_cli_group.group( name="provenance", context_settings=CONTEXT_SETTINGS, help=PROVENANCE_HELP ) @click.option( "-C", "--config-file", default=None, type=click.Path(exists=False, dir_okay=False, path_type=str), help="""YAML configuration file.""", ) @click.option( "-P", "--profile", default=None, type=click.Path(exists=False, dir_okay=False, path_type=str), help="""Enable profiling to specified file.""", ) @click.pass_context -def cli(ctx, config_file: Optional[str], profile: str): +def cli(ctx, config_file: Optional[str], profile: str) -> None: if config_file is None and config.config_exists(DEFAULT_PATH): config_file = DEFAULT_PATH if config_file is None: conf = DEFAULT_CONFIG else: # read_raw_config do not fail on ENOENT if not config.config_exists(config_file): raise FileNotFoundError(config_file) conf = config.read_raw_config(config.config_basepath(config_file)) conf = config.merge_configs(DEFAULT_CONFIG, conf) ctx.ensure_object(dict) ctx.obj["config"] = conf if profile: import atexit import cProfile print("Profiling...") pr = cProfile.Profile() pr.enable() - def exit(): + def exit() -> None: pr.disable() pr.dump_stats(profile) atexit.register(exit) @cli.command(name="iter-revisions") @click.argument("filename") @click.option("-a", "--track-all", default=True, type=bool) @click.option("-l", "--limit", type=int) @click.option("-m", "--min-depth", default=1, type=int) @click.option("-r", "--reuse", default=True, type=bool) @click.pass_context -def iter_revisions(ctx, filename, track_all, limit, min_depth, reuse): +def iter_revisions( + ctx, + filename: str, + track_all: bool, + limit: Optional[int], + min_depth: int, + reuse: bool, +) -> None: # TODO: add file size filtering """Process a provided list of revisions.""" from . import get_archive, get_provenance from .revision import CSVRevisionIterator, revision_add archive = get_archive(**ctx.obj["config"]["archive"]) provenance = get_provenance(**ctx.obj["config"]["provenance"]) - revisions_provider = ( - line.strip().split(",") for line in open(filename, "r") if line.strip() - ) + revisions_provider = generate_revision_tuples(filename) revisions = CSVRevisionIterator(revisions_provider, limit=limit) for revision in revisions: revision_add( provenance, archive, [revision], trackall=track_all, lower=reuse, mindepth=min_depth, ) +def generate_revision_tuples( + filename: str, +) -> Generator[Tuple[Sha1Git, datetime, Sha1Git], None, None]: + for line in open(filename, "r"): + if line.strip(): + revision, date, root = line.strip().split(",") + yield ( + hash_to_bytes(revision), + iso8601.parse_date(date, default_timezone=timezone.utc), + hash_to_bytes(root), + ) + + @cli.command(name="iter-origins") @click.argument("filename") @click.option("-l", "--limit", type=int) @click.pass_context -def iter_origins(ctx, filename, limit): +def iter_origins(ctx, filename: str, limit: Optional[int]) -> None: """Process a provided list of origins.""" from . import get_archive, get_provenance from .origin import CSVOriginIterator, origin_add archive = get_archive(**ctx.obj["config"]["archive"]) provenance = get_provenance(**ctx.obj["config"]["provenance"]) - origins_provider = ( - line.strip().split(",") for line in open(filename, "r") if line.strip() - ) + origins_provider = generate_origin_tuples(filename) origins = CSVOriginIterator(origins_provider, limit=limit) for origin in origins: origin_add(provenance, archive, [origin]) +def generate_origin_tuples(filename: str) -> Generator[Tuple[str, bytes], None, None]: + for line in open(filename, "r"): + if line.strip(): + url, snapshot = line.strip().split(",") + yield (url, hash_to_bytes(snapshot)) + + @cli.command(name="find-first") @click.argument("swhid") @click.pass_context -def find_first(ctx, swhid): +def find_first(ctx, swhid: str) -> None: """Find first occurrence of the requested blob.""" from . import get_provenance provenance = get_provenance(**ctx.obj["config"]["provenance"]) # TODO: return a dictionary with proper keys for each field occur = provenance.content_find_first(hash_to_bytes(swhid)) if occur is not None: print( f"swh:1:cnt:{hash_to_hex(occur.content)}, " f"swh:1:rev:{hash_to_hex(occur.revision)}, " f"{occur.date}, " f"{occur.origin}, " f"{os.fsdecode(occur.path)}" ) else: print(f"Cannot find a content with the id {swhid}") @cli.command(name="find-all") @click.argument("swhid") @click.option("-l", "--limit", type=int) @click.pass_context -def find_all(ctx, swhid, limit): +def find_all(ctx, swhid: str, limit: Optional[int]) -> None: """Find all occurrences of the requested blob.""" from . import get_provenance provenance = get_provenance(**ctx.obj["config"]["provenance"]) # TODO: return a dictionary with proper keys for each field for occur in provenance.content_find_all(hash_to_bytes(swhid), limit=limit): print( f"swh:1:cnt:{hash_to_hex(occur.content)}, " f"swh:1:rev:{hash_to_hex(occur.revision)}, " f"{occur.date}, " f"{occur.origin}, " f"{os.fsdecode(occur.path)}" ) diff --git a/swh/provenance/origin.py b/swh/provenance/origin.py index 72996bf..1f0bdf2 100644 --- a/swh/provenance/origin.py +++ b/swh/provenance/origin.py @@ -1,101 +1,101 @@ from itertools import islice import logging import time -from typing import Iterable, Iterator, List, Optional, Tuple +from typing import Generator, Iterable, Iterator, List, Optional, Tuple from swh.model.model import Sha1Git from .archive import ArchiveInterface from .graph import HistoryNode, build_history_graph from .model import OriginEntry, RevisionEntry from .provenance import ProvenanceInterface class CSVOriginIterator: """Iterator over origin visit statuses typically present in the given CSV file. The input is an iterator that produces 2 elements per row: (url, snap) where: - url: is the origin url of the visit - snap: sha1_git of the snapshot pointed by the visit status """ def __init__( self, statuses: Iterable[Tuple[str, Sha1Git]], limit: Optional[int] = None, - ): + ) -> None: self.statuses: Iterator[Tuple[str, Sha1Git]] if limit is not None: self.statuses = islice(statuses, limit) else: self.statuses = iter(statuses) - def __iter__(self): + def __iter__(self) -> Generator[OriginEntry, None, None]: return (OriginEntry(url, snapshot) for url, snapshot in self.statuses) def origin_add( provenance: ProvenanceInterface, archive: ArchiveInterface, origins: List[OriginEntry], -): +) -> None: start = time.time() for origin in origins: provenance.origin_add(origin) origin.retrieve_revisions(archive) for revision in origin.revisions: graph = build_history_graph(archive, provenance, revision) origin_add_revision(provenance, origin, graph) done = time.time() provenance.flush() stop = time.time() logging.debug( "Origins " ";".join([origin.id.hex() + ":" + origin.snapshot.hex() for origin in origins]) + f" were processed in {stop - start} secs (commit took {stop - done} secs)!" ) def origin_add_revision( provenance: ProvenanceInterface, origin: OriginEntry, graph: HistoryNode, -): +) -> None: # head is treated separately since it should always be added to the given origin head = graph.entry check_preferred_origin(provenance, origin, head) provenance.revision_add_to_origin(origin, head) # head's history should be recursively iterated starting from its parents stack = list(graph.parents) while stack: current = stack.pop() check_preferred_origin(provenance, origin, current.entry) if current.visited: # if current revision was already visited just add it to the current origin # and stop recursion (its history has already been flattened) provenance.revision_add_to_origin(origin, current.entry) else: # if current revision was not visited before create a link between it and # the head, and recursively walk its history provenance.revision_add_before_revision(head, current.entry) for parent in current.parents: stack.append(parent) def check_preferred_origin( provenance: ProvenanceInterface, origin: OriginEntry, revision: RevisionEntry, -): +) -> None: # if the revision has no preferred origin just set the given origin as the # preferred one. TODO: this should be improved in the future! preferred = provenance.revision_get_preferred_origin(revision) if preferred is None: provenance.revision_set_preferred_origin(origin, revision) diff --git a/swh/provenance/revision.py b/swh/provenance/revision.py index d857b21..5ea60df 100644 --- a/swh/provenance/revision.py +++ b/swh/provenance/revision.py @@ -1,252 +1,241 @@ from datetime import datetime, timezone from itertools import islice import logging import os import time -from typing import Iterable, Iterator, List, Optional, Tuple +from typing import Generator, Iterable, Iterator, List, Optional, Tuple -import iso8601 - -from swh.model.hashutil import hash_to_bytes from swh.model.model import Sha1Git from .archive import ArchiveInterface from .graph import IsochroneNode, build_isochrone_graph from .model import DirectoryEntry, RevisionEntry from .provenance import ProvenanceInterface class CSVRevisionIterator: """Iterator over revisions typically present in the given CSV file. The input is an iterator that produces 3 elements per row: (id, date, root) where: - id: is the id (sha1_git) of the revision - date: is the author date - root: sha1 of the directory """ def __init__( self, revisions: Iterable[Tuple[Sha1Git, datetime, Sha1Git]], limit: Optional[int] = None, - ): + ) -> None: self.revisions: Iterator[Tuple[Sha1Git, datetime, Sha1Git]] if limit is not None: self.revisions = islice(revisions, limit) else: self.revisions = iter(revisions) - def __iter__(self): - return self - - def __next__(self): - id, date, root = next(self.revisions) - date = iso8601.parse_date(date) - if date.tzinfo is None: - date = date.replace(tzinfo=timezone.utc) - return RevisionEntry( - hash_to_bytes(id), - date=date, - root=hash_to_bytes(root), - ) + def __iter__(self) -> Generator[RevisionEntry, None, None]: + for id, date, root in self.revisions: + if date.tzinfo is None: + date = date.replace(tzinfo=timezone.utc) + yield RevisionEntry(id, date=date, root=root) def revision_add( provenance: ProvenanceInterface, archive: ArchiveInterface, revisions: List[RevisionEntry], trackall: bool = True, lower: bool = True, mindepth: int = 1, commit: bool = True, ) -> None: start = time.time() for revision in revisions: assert revision.date is not None assert revision.root is not None # Processed content starting from the revision's root directory. date = provenance.revision_get_date(revision) if date is None or revision.date < date: logging.debug( f"Processing revisions {revision.id.hex()}" f" (known date {date} / revision date {revision.date})..." ) graph = build_isochrone_graph( archive, provenance, revision, DirectoryEntry(revision.root), ) # TODO: add file size filtering revision_process_content( archive, provenance, revision, graph, trackall=trackall, lower=lower, mindepth=mindepth, ) done = time.time() if commit: provenance.flush() stop = time.time() logging.debug( f"Revisions {';'.join([revision.id.hex() for revision in revisions])} " f" were processed in {stop - start} secs (commit took {stop - done} secs)!" ) def revision_process_content( archive: ArchiveInterface, provenance: ProvenanceInterface, revision: RevisionEntry, graph: IsochroneNode, trackall: bool = True, lower: bool = True, mindepth: int = 1, -): +) -> None: assert revision.date is not None provenance.revision_add(revision) stack = [graph] while stack: current = stack.pop() if current.dbdate is not None: assert current.dbdate <= revision.date if trackall: # Current directory is an outer isochrone frontier for a previously # processed revision. It should be reused as is. provenance.directory_add_to_revision( revision, current.entry, current.path ) else: assert current.maxdate is not None # Current directory is not an outer isochrone frontier for any previous # revision. It might be eligible for this one. if is_new_frontier( current, revision=revision, trackall=trackall, lower=lower, mindepth=mindepth, ): # Outer frontier should be moved to current position in the isochrone # graph. This is the first time this directory is found in the isochrone # frontier. provenance.directory_set_date_in_isochrone_frontier( current.entry, current.maxdate ) if trackall: provenance.directory_add_to_revision( revision, current.entry, current.path ) flatten_directory(archive, provenance, current.entry) else: # If current node is an invalidated frontier, update its date for future # revisions to get the proper value. if current.invalid: provenance.directory_set_date_in_isochrone_frontier( current.entry, current.maxdate ) # No point moving the frontier here. Either there are no files or they # are being seen for the first time here. Add all blobs to current # revision updating date if necessary, and recursively analyse # subdirectories as candidates to the outer frontier. for blob in current.entry.files: date = provenance.content_get_early_date(blob) if date is None or revision.date < date: provenance.content_set_early_date(blob, revision.date) provenance.content_add_to_revision(revision, blob, current.path) for child in current.children: stack.append(child) def flatten_directory( archive: ArchiveInterface, provenance: ProvenanceInterface, directory: DirectoryEntry, ) -> None: """Recursively retrieve all the files of 'directory' and insert them in the 'provenance' database in the 'content_to_directory' table. """ stack = [(directory, b"")] while stack: current, prefix = stack.pop() current.retrieve_children(archive) for f_child in current.files: # Add content to the directory with the computed prefix. provenance.content_add_to_directory(directory, f_child, prefix) for d_child in current.dirs: # Recursively walk the child directory. stack.append((d_child, os.path.join(prefix, d_child.name))) def is_new_frontier( node: IsochroneNode, revision: RevisionEntry, trackall: bool = True, lower: bool = True, mindepth: int = 1, ) -> bool: assert node.maxdate is not None # for mypy assert revision.date is not None # idem if trackall: # The only real condition for a directory to be a frontier is that its # content is already known and its maxdate is less (or equal) than # current revision's date. Checking mindepth is meant to skip root # directories (or any arbitrary depth) to improve the result. The # option lower tries to maximize the reusage rate of previously defined # frontiers by keeping them low in the directory tree. return ( node.known and node.maxdate <= revision.date # all content is earlier than revision and node.depth >= mindepth # current node is deeper than the min allowed depth and (has_blobs(node) if lower else True) # there is at least one blob in it ) else: # If we are only tracking first occurrences, we want to ensure that all first # occurrences end up in the content_early_in_rev relation. Thus, we force for # every blob outside a frontier to have an extrictly earlier date. return ( node.maxdate < revision.date # all content is earlier than revision and node.depth >= mindepth # deeper than the min allowed depth and (has_blobs(node) if lower else True) # there is at least one blob ) def has_blobs(node: IsochroneNode) -> bool: # We may want to look for files in different ways to decide whether to define a # frontier or not: # 1. Only files in current node: return any(node.entry.files) # 2. Files anywhere in the isochrone graph # stack = [node] # while stack: # current = stack.pop() # if any( # map(lambda child: isinstance(child.entry, FileEntry), current.children)): # return True # else: # # All children are directory entries. # stack.extend(current.children) # return False # 3. Files in the intermediate directories between current node and any previously # defined frontier: # TODO: complete this case! # return any( # map(lambda child: isinstance(child.entry, FileEntry), node.children) # ) or all( # map( # lambda child: ( # not (isinstance(child.entry, DirectoryEntry) and child.date is None) # ) # or has_blobs(child), # node.children, # ) # ) diff --git a/swh/provenance/tests/test_revision_iterator.py b/swh/provenance/tests/test_revision_iterator.py index ab1a779..72409dd 100644 --- a/swh/provenance/tests/test_revision_iterator.py +++ b/swh/provenance/tests/test_revision_iterator.py @@ -1,30 +1,29 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import pytest from swh.provenance.revision import CSVRevisionIterator from swh.provenance.tests.conftest import fill_storage, load_repo_data from swh.provenance.tests.test_provenance_db import ts2dt @pytest.mark.parametrize( "repo", ( "cmdbts2", "out-of-order", ), ) def test_archive_direct_revision_iterator(swh_storage, repo): """Test CSVRevisionIterator""" data = load_repo_data(repo) fill_storage(swh_storage, data) revisions_csv = [ - (rev["id"], ts2dt(rev["date"]).isoformat(), rev["directory"]) - for rev in data["revision"] + (rev["id"], ts2dt(rev["date"]), rev["directory"]) for rev in data["revision"] ] revisions = list(CSVRevisionIterator(revisions_csv)) assert revisions assert len(revisions) == len(data["revision"])