diff --git a/swh/provenance/cli.py b/swh/provenance/cli.py index 8dc31f2..ad0d042 100644 --- a/swh/provenance/cli.py +++ b/swh/provenance/cli.py @@ -1,255 +1,292 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information # WARNING: do not import unnecessary things here to keep cli startup time under # control from datetime import datetime, timezone import os from typing import Any, Dict, Generator, Optional, Tuple import click import iso8601 import yaml from swh.core import config from swh.core.cli import CONTEXT_SETTINGS from swh.core.cli import swh as swh_cli_group from swh.model.hashutil import hash_to_bytes, hash_to_hex from swh.model.model import Sha1Git # All generic config code should reside in swh.core.config CONFIG_ENVVAR = "SWH_CONFIG_FILENAME" DEFAULT_PATH = os.environ.get(CONFIG_ENVVAR, None) DEFAULT_CONFIG: Dict[str, Any] = { "provenance": { "archive": { # Storage API based Archive object # "cls": "api", # "storage": { # "cls": "remote", # "url": "http://uffizi.internal.softwareheritage.org:5002", # } # Direct access Archive object "cls": "direct", "db": { "host": "belvedere.internal.softwareheritage.org", "port": 5432, "dbname": "softwareheritage", "user": "guest", }, }, "storage": { # Local PostgreSQL Storage # "cls": "postgresql", # "db": { # "host": "localhost", # "user": "postgres", # "password": "postgres", # "dbname": "provenance", # }, # Local MongoDB Storage # "cls": "mongodb", # "db": { # "dbname": "provenance", # }, # Remote RabbitMQ/PostgreSQL Storage "cls": "rabbitmq", "url": "amqp://localhost:5672/%2f", "storage_config": { "cls": "postgresql", "db": { "host": "localhost", "user": "postgres", "password": "postgres", "dbname": "provenance", }, }, "batch_size": 100, "prefetch_count": 100, }, } } CONFIG_FILE_HELP = f""" \b Configuration can be loaded from a yaml file given either as --config-file option or the {CONFIG_ENVVAR} environment variable. If no configuration file is specified, use the following default configuration:: \b {yaml.dump(DEFAULT_CONFIG)}""" PROVENANCE_HELP = f"""Software Heritage provenance index database tools {CONFIG_FILE_HELP} """ @swh_cli_group.group( name="provenance", context_settings=CONTEXT_SETTINGS, help=PROVENANCE_HELP ) @click.option( "-C", "--config-file", default=None, type=click.Path(exists=True, dir_okay=False, path_type=str), help="""YAML configuration file.""", ) @click.option( "-P", "--profile", default=None, type=click.Path(exists=False, dir_okay=False, path_type=str), help="""Enable profiling to specified file.""", ) @click.pass_context def cli(ctx: click.core.Context, config_file: Optional[str], profile: str) -> None: if ( config_file is None and DEFAULT_PATH is not None and config.config_exists(DEFAULT_PATH) ): config_file = DEFAULT_PATH if config_file is None: conf = DEFAULT_CONFIG else: # read_raw_config do not fail on ENOENT if not os.path.exists(config_file): raise FileNotFoundError(config_file) conf = yaml.safe_load(open(config_file, "rb")) ctx.ensure_object(dict) ctx.obj["config"] = conf if profile: import atexit import cProfile print("Profiling...") pr = cProfile.Profile() pr.enable() def exit() -> None: pr.disable() pr.dump_stats(profile) atexit.register(exit) +@cli.command(name="iter-frontiers") +@click.argument("filename") +@click.option("-l", "--limit", type=int) +@click.option("-s", "--min-size", default=0, type=int) +@click.pass_context +def iter_frontiers( + ctx: click.core.Context, + filename: str, + limit: Optional[int], + min_size: int, +) -> None: + """Process a provided list of directories in the isochrone frontier.""" + from . import get_archive, get_provenance + from .directory import CSVDirectoryIterator, directory_add + + archive = get_archive(**ctx.obj["config"]["provenance"]["archive"]) + directories_provider = generate_directory_ids(filename) + directories = CSVDirectoryIterator(directories_provider, limit=limit) + + with get_provenance(**ctx.obj["config"]["provenance"]["storage"]) as provenance: + for directory in directories: + directory_add( + provenance, + archive, + [directory], + minsize=min_size, + ) + + +def generate_directory_ids( + filename: str, +) -> Generator[Sha1Git, None, None]: + for line in open(filename, "r"): + if line.strip(): + yield hash_to_bytes(line.strip()) + + @cli.command(name="iter-revisions") @click.argument("filename") @click.option("-a", "--track-all", default=True, type=bool) @click.option("-l", "--limit", type=int) @click.option("-m", "--min-depth", default=1, type=int) @click.option("-r", "--reuse", default=True, type=bool) @click.option("-s", "--min-size", default=0, type=int) @click.pass_context def iter_revisions( ctx: click.core.Context, filename: str, track_all: bool, limit: Optional[int], min_depth: int, reuse: bool, min_size: int, ) -> None: """Process a provided list of revisions.""" from . import get_archive, get_provenance from .revision import CSVRevisionIterator, revision_add archive = get_archive(**ctx.obj["config"]["provenance"]["archive"]) revisions_provider = generate_revision_tuples(filename) revisions = CSVRevisionIterator(revisions_provider, limit=limit) with get_provenance(**ctx.obj["config"]["provenance"]["storage"]) as provenance: for revision in revisions: revision_add( provenance, archive, [revision], trackall=track_all, lower=reuse, mindepth=min_depth, minsize=min_size, ) def generate_revision_tuples( filename: str, ) -> Generator[Tuple[Sha1Git, datetime, Sha1Git], None, None]: for line in open(filename, "r"): if line.strip(): revision, date, root = line.strip().split(",") yield ( hash_to_bytes(revision), iso8601.parse_date(date, default_timezone=timezone.utc), hash_to_bytes(root), ) @cli.command(name="iter-origins") @click.argument("filename") @click.option("-l", "--limit", type=int) @click.pass_context def iter_origins(ctx: click.core.Context, filename: str, limit: Optional[int]) -> None: """Process a provided list of origins.""" from . import get_archive, get_provenance from .origin import CSVOriginIterator, origin_add archive = get_archive(**ctx.obj["config"]["provenance"]["archive"]) origins_provider = generate_origin_tuples(filename) origins = CSVOriginIterator(origins_provider, limit=limit) with get_provenance(**ctx.obj["config"]["provenance"]["storage"]) as provenance: for origin in origins: origin_add(provenance, archive, [origin]) def generate_origin_tuples(filename: str) -> Generator[Tuple[str, bytes], None, None]: for line in open(filename, "r"): if line.strip(): url, snapshot = line.strip().split(",") yield (url, hash_to_bytes(snapshot)) @cli.command(name="find-first") @click.argument("swhid") @click.pass_context def find_first(ctx: click.core.Context, swhid: str) -> None: """Find first occurrence of the requested blob.""" from . import get_provenance with get_provenance(**ctx.obj["config"]["provenance"]["storage"]) as provenance: occur = provenance.content_find_first(hash_to_bytes(swhid)) if occur is not None: print( f"swh:1:cnt:{hash_to_hex(occur.content)}, " f"swh:1:rev:{hash_to_hex(occur.revision)}, " f"{occur.date}, " f"{occur.origin}, " f"{os.fsdecode(occur.path)}" ) else: print(f"Cannot find a content with the id {swhid}") @cli.command(name="find-all") @click.argument("swhid") @click.option("-l", "--limit", type=int) @click.pass_context def find_all(ctx: click.core.Context, swhid: str, limit: Optional[int]) -> None: """Find all occurrences of the requested blob.""" from . import get_provenance with get_provenance(**ctx.obj["config"]["provenance"]["storage"]) as provenance: for occur in provenance.content_find_all(hash_to_bytes(swhid), limit=limit): print( f"swh:1:cnt:{hash_to_hex(occur.content)}, " f"swh:1:rev:{hash_to_hex(occur.revision)}, " f"{occur.date}, " f"{occur.origin}, " f"{os.fsdecode(occur.path)}" ) diff --git a/swh/provenance/directory.py b/swh/provenance/directory.py new file mode 100644 index 0000000..5430454 --- /dev/null +++ b/swh/provenance/directory.py @@ -0,0 +1,86 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import os +from typing import Generator, Iterable, Iterator, List, Optional + +from swh.core.statsd import statsd +from swh.model.model import Sha1Git + +from .archive import ArchiveInterface +from .interface import ProvenanceInterface +from .model import DirectoryEntry + +REVISION_DURATION_METRIC = "swh_provenance_directory_duration_seconds" + + +class CSVDirectoryIterator: + """Iterator over directories typically present in the given CSV file. + + The input is an iterator that produces ids (sha1_git) of directories + """ + + def __init__( + self, + directories: Iterable[Sha1Git], + limit: Optional[int] = None, + ) -> None: + self.directories: Iterator[Sha1Git] + if limit is not None: + from itertools import islice + + self.directories = islice(directories, limit) + else: + self.directories = iter(directories) + + def __iter__(self) -> Generator[DirectoryEntry, None, None]: + for id in self.directories: + yield DirectoryEntry(id) + + +@statsd.timed(metric=REVISION_DURATION_METRIC, tags={"method": "main"}) +def directory_add( + provenance: ProvenanceInterface, + archive: ArchiveInterface, + directories: List[DirectoryEntry], + minsize: int = 0, + commit: bool = True, +) -> None: + for directory in directories: + # Only flatten directories that are present in the provenance model, but not + # flattenned yet. + flattenned = provenance.directory_already_flattenned(directory) + if flattenned is not None and not flattenned: + directory_flatten( + provenance, + archive, + directory, + minsize=minsize, + ) + if commit: + provenance.flush() + + +@statsd.timed(metric=REVISION_DURATION_METRIC, tags={"method": "flatten"}) +def directory_flatten( + provenance: ProvenanceInterface, + archive: ArchiveInterface, + directory: DirectoryEntry, + minsize: int = 0, +) -> None: + """Recursively retrieve all the files of 'directory' and insert them in the + 'provenance' database in the 'content_to_directory' table. + """ + stack = [(directory, b"")] + while stack: + current, prefix = stack.pop() + current.retrieve_children(archive, minsize=minsize) + for f_child in current.files: + # Add content to the directory with the computed prefix. + provenance.content_add_to_directory(directory, f_child, prefix) + for d_child in current.dirs: + # Recursively walk the child directory. + stack.append((d_child, os.path.join(prefix, d_child.name))) + provenance.directory_flag_as_flattenned(directory) diff --git a/swh/provenance/interface.py b/swh/provenance/interface.py index 7b5ba3f..78a7279 100644 --- a/swh/provenance/interface.py +++ b/swh/provenance/interface.py @@ -1,384 +1,396 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from __future__ import annotations from dataclasses import dataclass from datetime import datetime import enum from types import TracebackType from typing import Dict, Generator, Iterable, Optional, Set, Type, Union from typing_extensions import Protocol, runtime_checkable from swh.core.api import remote_api_endpoint from swh.model.model import Sha1Git from .model import DirectoryEntry, FileEntry, OriginEntry, RevisionEntry class EntityType(enum.Enum): CONTENT = "content" DIRECTORY = "directory" REVISION = "revision" ORIGIN = "origin" class RelationType(enum.Enum): CNT_EARLY_IN_REV = "content_in_revision" CNT_IN_DIR = "content_in_directory" DIR_IN_REV = "directory_in_revision" REV_IN_ORG = "revision_in_origin" REV_BEFORE_REV = "revision_before_revision" @dataclass(eq=True, frozen=True) class ProvenanceResult: content: Sha1Git revision: Sha1Git date: datetime origin: Optional[str] path: bytes @dataclass(eq=True, frozen=True) class DirectoryData: """Object representing the data associated to a directory in the provenance model, where `date` is the date of the directory in the isochrone frontier, and `flat` is a flag acknowledging that a flat model for the elements outside the frontier has already been created. """ date: datetime flat: bool @dataclass(eq=True, frozen=True) class RevisionData: """Object representing the data associated to a revision in the provenance model, where `date` is the optional date of the revision (specifying it acknowledges that the revision was already processed by the revision-content algorithm); and `origin` identifies the preferred origin for the revision, if any. """ date: Optional[datetime] origin: Optional[Sha1Git] @dataclass(eq=True, frozen=True) class RelationData: """Object representing a relation entry in the provenance model, where `src` and `dst` are the sha1 ids of the entities being related, and `path` is optional depending on the relation being represented. """ dst: Sha1Git path: Optional[bytes] @runtime_checkable class ProvenanceStorageInterface(Protocol): def __enter__(self) -> ProvenanceStorageInterface: ... def __exit__( self, exc_type: Optional[Type[BaseException]], exc_val: Optional[BaseException], exc_tb: Optional[TracebackType], ) -> None: ... @remote_api_endpoint("close") def close(self) -> None: """Close connection to the storage and release resources.""" ... @remote_api_endpoint("content_add") def content_add(self, cnts: Dict[Sha1Git, datetime]) -> bool: """Add blobs identified by sha1 ids, with an associated date (as paired in `cnts`) to the provenance storage. Return a boolean stating whether the information was successfully stored. """ ... @remote_api_endpoint("content_find_first") def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]: """Retrieve the first occurrence of the blob identified by `id`.""" ... @remote_api_endpoint("content_find_all") def content_find_all( self, id: Sha1Git, limit: Optional[int] = None ) -> Generator[ProvenanceResult, None, None]: """Retrieve all the occurrences of the blob identified by `id`.""" ... @remote_api_endpoint("content_get") def content_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]: """Retrieve the associated date for each blob sha1 in `ids`.""" ... @remote_api_endpoint("directory_add") def directory_add(self, dirs: Dict[Sha1Git, DirectoryData]) -> bool: """Add directories identified by sha1 ids, with associated date and (optional) flatten flag (as paired in `dirs`) to the provenance storage. If the flatten flag is set to None, the previous value present in the storage is preserved. Return a boolean stating if the information was successfully stored. """ ... @remote_api_endpoint("directory_get") def directory_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, DirectoryData]: """Retrieve the associated date and (optional) flatten flag for each directory sha1 in `ids`. If some directories has no associated date, it is not present in the resulting dictionary. """ ... @remote_api_endpoint("entity_get_all") def entity_get_all(self, entity: EntityType) -> Set[Sha1Git]: """Retrieve all sha1 ids for entities of type `entity` present in the provenance model. This method is used only in tests. """ ... @remote_api_endpoint("location_add") def location_add(self, paths: Iterable[bytes]) -> bool: """Register the given `paths` in the storage.""" ... @remote_api_endpoint("location_get_all") def location_get_all(self) -> Set[bytes]: """Retrieve all paths present in the provenance model. This method is used only in tests.""" ... @remote_api_endpoint("open") def open(self) -> None: """Open connection to the storage and allocate necessary resources.""" ... @remote_api_endpoint("origin_add") def origin_add(self, orgs: Dict[Sha1Git, str]) -> bool: """Add origins identified by sha1 ids, with their corresponding url (as paired in `orgs`) to the provenance storage. Return a boolean stating if the information was successfully stored. """ ... @remote_api_endpoint("origin_get") def origin_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, str]: """Retrieve the associated url for each origin sha1 in `ids`.""" ... @remote_api_endpoint("revision_add") def revision_add( self, revs: Union[Iterable[Sha1Git], Dict[Sha1Git, RevisionData]] ) -> bool: """Add revisions identified by sha1 ids, with optional associated date or origin (as paired in `revs`) to the provenance storage. Return a boolean stating if the information was successfully stored. """ ... @remote_api_endpoint("revision_get") def revision_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, RevisionData]: """Retrieve the associated date and origin for each revision sha1 in `ids`. If some revision has no associated date nor origin, it is not present in the resulting dictionary. """ ... @remote_api_endpoint("relation_add") def relation_add( self, relation: RelationType, data: Dict[Sha1Git, Set[RelationData]] ) -> bool: """Add entries in the selected `relation`. This method assumes all entities being related are already registered in the storage. See `content_add`, `directory_add`, `origin_add`, and `revision_add`. """ ... @remote_api_endpoint("relation_get") def relation_get( self, relation: RelationType, ids: Iterable[Sha1Git], reverse: bool = False ) -> Dict[Sha1Git, Set[RelationData]]: """Retrieve all entries in the selected `relation` whose source entities are identified by some sha1 id in `ids`. If `reverse` is set, destination entities are matched instead. """ ... @remote_api_endpoint("relation_get_all") def relation_get_all( self, relation: RelationType ) -> Dict[Sha1Git, Set[RelationData]]: """Retrieve all entries in the selected `relation` that are present in the provenance model. This method is used only in tests. """ ... @remote_api_endpoint("with_path") def with_path(self) -> bool: ... @runtime_checkable class ProvenanceInterface(Protocol): storage: ProvenanceStorageInterface def __enter__(self) -> ProvenanceInterface: ... def __exit__( self, exc_type: Optional[Type[BaseException]], exc_val: Optional[BaseException], exc_tb: Optional[TracebackType], ) -> None: ... def close(self) -> None: """Close connection to the underlying `storage` and release resources.""" ... def flush(self) -> None: """Flush internal cache to the underlying `storage`.""" ... def content_add_to_directory( self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes ) -> None: """Associate `blob` with `directory` in the provenance model. `prefix` is the relative path from `directory` to `blob` (excluding `blob`'s name). """ ... def content_add_to_revision( self, revision: RevisionEntry, blob: FileEntry, prefix: bytes ) -> None: """Associate `blob` with `revision` in the provenance model. `prefix` is the absolute path from `revision`'s root directory to `blob` (excluding `blob`'s name). """ ... def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]: """Retrieve the first occurrence of the blob identified by `id`.""" ... def content_find_all( self, id: Sha1Git, limit: Optional[int] = None ) -> Generator[ProvenanceResult, None, None]: """Retrieve all the occurrences of the blob identified by `id`.""" ... def content_get_early_date(self, blob: FileEntry) -> Optional[datetime]: """Retrieve the earliest known date of `blob`.""" ... def content_get_early_dates( self, blobs: Iterable[FileEntry] ) -> Dict[Sha1Git, datetime]: """Retrieve the earliest known date for each blob in `blobs`. If some blob has no associated date, it is not present in the resulting dictionary. """ ... def content_set_early_date(self, blob: FileEntry, date: datetime) -> None: """Associate `date` to `blob` as it's earliest known date.""" ... def directory_add_to_revision( self, revision: RevisionEntry, directory: DirectoryEntry, path: bytes ) -> None: """Associate `directory` with `revision` in the provenance model. `path` is the absolute path from `revision`'s root directory to `directory` (including `directory`'s name). """ ... + def directory_already_flattenned(self, directory: DirectoryEntry) -> Optional[bool]: + """Check if the directory is already flattenned in the provenance model. If the + directory is unknown for the model, the methods returns None. + """ + ... + + def directory_flag_as_flattenned(self, directory: DirectoryEntry) -> None: + """Mark the directory as flattenned in the provenance model. If the + directory is unknown for the model, this method has no effect. + """ + ... + def directory_get_date_in_isochrone_frontier( self, directory: DirectoryEntry ) -> Optional[datetime]: """Retrieve the earliest known date of `directory` as an isochrone frontier in the provenance model. """ ... def directory_get_dates_in_isochrone_frontier( self, dirs: Iterable[DirectoryEntry] ) -> Dict[Sha1Git, datetime]: """Retrieve the earliest known date for each directory in `dirs` as isochrone frontiers provenance model. If some directory has no associated date, it is not present in the resulting dictionary. """ ... def directory_set_date_in_isochrone_frontier( self, directory: DirectoryEntry, date: datetime ) -> None: """Associate `date` to `directory` as it's earliest known date as an isochrone frontier in the provenance model. """ ... def open(self) -> None: """Open connection to the underlying `storage` and allocate necessary resources. """ ... def origin_add(self, origin: OriginEntry) -> None: """Add `origin` to the provenance model.""" ... def revision_add(self, revision: RevisionEntry) -> None: """Add `revision` to the provenance model. This implies storing `revision`'s date in the model, thus `revision.date` must be a valid date. """ ... def revision_add_before_revision( self, head: RevisionEntry, revision: RevisionEntry ) -> None: """Associate `revision` to `head` as an ancestor of the latter.""" ... def revision_add_to_origin( self, origin: OriginEntry, revision: RevisionEntry ) -> None: """Associate `revision` to `origin` as a head revision of the latter (ie. the target of an snapshot for `origin` in the archive).""" ... def revision_get_date(self, revision: RevisionEntry) -> Optional[datetime]: """Retrieve the date associated to `revision`.""" ... def revision_get_preferred_origin( self, revision: RevisionEntry ) -> Optional[Sha1Git]: """Retrieve the preferred origin associated to `revision`.""" ... def revision_in_history(self, revision: RevisionEntry) -> bool: """Check if `revision` is known to be an ancestor of some head revision in the provenance model. """ ... def revision_set_preferred_origin( self, origin: OriginEntry, revision: RevisionEntry ) -> None: """Associate `origin` as the preferred origin for `revision`.""" ... def revision_visited(self, revision: RevisionEntry) -> bool: """Check if `revision` is known to be a head revision for some origin in the provenance model. """ ... diff --git a/swh/provenance/provenance.py b/swh/provenance/provenance.py index 5ca39b2..b0f43c6 100644 --- a/swh/provenance/provenance.py +++ b/swh/provenance/provenance.py @@ -1,489 +1,504 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime import logging import os from types import TracebackType from typing import Dict, Generator, Iterable, Optional, Set, Tuple, Type from typing_extensions import Literal, TypedDict from swh.core.statsd import statsd from swh.model.model import Sha1Git from .interface import ( DirectoryData, ProvenanceInterface, ProvenanceResult, ProvenanceStorageInterface, RelationData, RelationType, RevisionData, ) from .model import DirectoryEntry, FileEntry, OriginEntry, RevisionEntry from .util import path_normalize LOGGER = logging.getLogger(__name__) BACKEND_DURATION_METRIC = "swh_provenance_backend_duration_seconds" BACKEND_OPERATIONS_METRIC = "swh_provenance_backend_operations_total" class DatetimeCache(TypedDict): data: Dict[Sha1Git, Optional[datetime]] # None means unknown added: Set[Sha1Git] class OriginCache(TypedDict): data: Dict[Sha1Git, str] added: Set[Sha1Git] class RevisionCache(TypedDict): data: Dict[Sha1Git, Sha1Git] added: Set[Sha1Git] class ProvenanceCache(TypedDict): content: DatetimeCache directory: DatetimeCache directory_flatten: Dict[Sha1Git, Optional[bool]] # None means unknown revision: DatetimeCache # below are insertion caches only content_in_revision: Set[Tuple[Sha1Git, Sha1Git, bytes]] content_in_directory: Set[Tuple[Sha1Git, Sha1Git, bytes]] directory_in_revision: Set[Tuple[Sha1Git, Sha1Git, bytes]] # these two are for the origin layer origin: OriginCache revision_origin: RevisionCache revision_before_revision: Dict[Sha1Git, Set[Sha1Git]] revision_in_origin: Set[Tuple[Sha1Git, Sha1Git]] def new_cache() -> ProvenanceCache: return ProvenanceCache( content=DatetimeCache(data={}, added=set()), directory=DatetimeCache(data={}, added=set()), directory_flatten={}, revision=DatetimeCache(data={}, added=set()), content_in_revision=set(), content_in_directory=set(), directory_in_revision=set(), origin=OriginCache(data={}, added=set()), revision_origin=RevisionCache(data={}, added=set()), revision_before_revision={}, revision_in_origin=set(), ) class Provenance: def __init__(self, storage: ProvenanceStorageInterface) -> None: self.storage = storage self.cache = new_cache() def __enter__(self) -> ProvenanceInterface: self.open() return self def __exit__( self, exc_type: Optional[Type[BaseException]], exc_val: Optional[BaseException], exc_tb: Optional[TracebackType], ) -> None: self.close() def clear_caches(self) -> None: self.cache = new_cache() def close(self) -> None: self.storage.close() @statsd.timed(metric=BACKEND_DURATION_METRIC, tags={"method": "flush"}) def flush(self) -> None: self.flush_revision_content_layer() self.flush_origin_revision_layer() self.clear_caches() @statsd.timed( metric=BACKEND_DURATION_METRIC, tags={"method": "flush_origin_revision"} ) def flush_origin_revision_layer(self) -> None: # Origins and revisions should be inserted first so that internal ids' # resolution works properly. urls = { sha1: url for sha1, url in self.cache["origin"]["data"].items() if sha1 in self.cache["origin"]["added"] } if urls: while not self.storage.origin_add(urls): statsd.increment( metric=BACKEND_OPERATIONS_METRIC, tags={"method": "flush_origin_revision_retry_origin"}, ) LOGGER.warning( "Unable to write origins urls to the storage. Retrying..." ) rev_orgs = { # Destinations in this relation should match origins in the next one **{ src: RevisionData(date=None, origin=None) for src in self.cache["revision_before_revision"] }, **{ # This relation comes second so that non-None origins take precedence src: RevisionData(date=None, origin=org) for src, org in self.cache["revision_in_origin"] }, } if rev_orgs: while not self.storage.revision_add(rev_orgs): statsd.increment( metric=BACKEND_OPERATIONS_METRIC, tags={"method": "flush_origin_revision_retry_revision"}, ) LOGGER.warning( "Unable to write revision entities to the storage. Retrying..." ) # Second, flat models for revisions' histories (ie. revision-before-revision). if self.cache["revision_before_revision"]: rev_before_rev = { src: {RelationData(dst=dst, path=None) for dst in dsts} for src, dsts in self.cache["revision_before_revision"].items() } while not self.storage.relation_add( RelationType.REV_BEFORE_REV, rev_before_rev ): statsd.increment( metric=BACKEND_OPERATIONS_METRIC, tags={ "method": "flush_origin_revision_retry_revision_before_revision" }, ) LOGGER.warning( "Unable to write %s rows to the storage. Retrying...", RelationType.REV_BEFORE_REV, ) # Heads (ie. revision-in-origin entries) should be inserted once flat models for # their histories were already added. This is to guarantee consistent results if # something needs to be reprocessed due to a failure: already inserted heads # won't get reprocessed in such a case. if self.cache["revision_in_origin"]: rev_in_org: Dict[Sha1Git, Set[RelationData]] = {} for src, dst in self.cache["revision_in_origin"]: rev_in_org.setdefault(src, set()).add(RelationData(dst=dst, path=None)) while not self.storage.relation_add(RelationType.REV_IN_ORG, rev_in_org): statsd.increment( metric=BACKEND_OPERATIONS_METRIC, tags={"method": "flush_origin_revision_retry_revision_in_origin"}, ) LOGGER.warning( "Unable to write %s rows to the storage. Retrying...", RelationType.REV_IN_ORG, ) @statsd.timed( metric=BACKEND_DURATION_METRIC, tags={"method": "flush_revision_content"} ) def flush_revision_content_layer(self) -> None: # Register in the storage all entities, to ensure the coming relations can # properly resolve any internal reference if needed. Content and directory # entries may safely be registered with their associated dates. In contrast, # revision entries should be registered without date, as it is used to # acknowledge that the flushing was successful. Also, directories are # registered with their flatten flag not set. cnt_dates = { sha1: date for sha1, date in self.cache["content"]["data"].items() if sha1 in self.cache["content"]["added"] and date is not None } if cnt_dates: while not self.storage.content_add(cnt_dates): statsd.increment( metric=BACKEND_OPERATIONS_METRIC, tags={"method": "flush_revision_content_retry_content_date"}, ) LOGGER.warning( "Unable to write content dates to the storage. Retrying..." ) dir_dates = { sha1: DirectoryData(date=date, flat=False) for sha1, date in self.cache["directory"]["data"].items() if sha1 in self.cache["directory"]["added"] and date is not None } if dir_dates: while not self.storage.directory_add(dir_dates): statsd.increment( metric=BACKEND_OPERATIONS_METRIC, tags={"method": "flush_revision_content_retry_directory_date"}, ) LOGGER.warning( "Unable to write directory dates to the storage. Retrying..." ) revs = { sha1 for sha1, date in self.cache["revision"]["data"].items() if sha1 in self.cache["revision"]["added"] and date is not None } if revs: while not self.storage.revision_add(revs): statsd.increment( metric=BACKEND_OPERATIONS_METRIC, tags={"method": "flush_revision_content_retry_revision_none"}, ) LOGGER.warning( "Unable to write revision entities to the storage. Retrying..." ) paths = { path for _, _, path in self.cache["content_in_revision"] | self.cache["content_in_directory"] | self.cache["directory_in_revision"] } if paths: while not self.storage.location_add(paths): statsd.increment( metric=BACKEND_OPERATIONS_METRIC, tags={"method": "flush_revision_content_retry_location"}, ) LOGGER.warning( "Unable to write locations entities to the storage. Retrying..." ) # For this layer, relations need to be inserted first so that, in case of # failure, reprocessing the input does not generated an inconsistent database. if self.cache["content_in_revision"]: cnt_in_rev: Dict[Sha1Git, Set[RelationData]] = {} for src, dst, path in self.cache["content_in_revision"]: cnt_in_rev.setdefault(src, set()).add(RelationData(dst=dst, path=path)) while not self.storage.relation_add( RelationType.CNT_EARLY_IN_REV, cnt_in_rev ): statsd.increment( metric=BACKEND_OPERATIONS_METRIC, tags={"method": "flush_revision_content_retry_content_in_revision"}, ) LOGGER.warning( "Unable to write %s rows to the storage. Retrying...", RelationType.CNT_EARLY_IN_REV, ) if self.cache["content_in_directory"]: cnt_in_dir: Dict[Sha1Git, Set[RelationData]] = {} for src, dst, path in self.cache["content_in_directory"]: cnt_in_dir.setdefault(src, set()).add(RelationData(dst=dst, path=path)) while not self.storage.relation_add(RelationType.CNT_IN_DIR, cnt_in_dir): statsd.increment( metric=BACKEND_OPERATIONS_METRIC, tags={ "method": "flush_revision_content_retry_content_in_directory" }, ) LOGGER.warning( "Unable to write %s rows to the storage. Retrying...", RelationType.CNT_IN_DIR, ) if self.cache["directory_in_revision"]: dir_in_rev: Dict[Sha1Git, Set[RelationData]] = {} for src, dst, path in self.cache["directory_in_revision"]: dir_in_rev.setdefault(src, set()).add(RelationData(dst=dst, path=path)) while not self.storage.relation_add(RelationType.DIR_IN_REV, dir_in_rev): statsd.increment( metric=BACKEND_OPERATIONS_METRIC, tags={ "method": "flush_revision_content_retry_directory_in_revision" }, ) LOGGER.warning( "Unable to write %s rows to the storage. Retrying...", RelationType.DIR_IN_REV, ) # After relations, flatten flags for directories can be safely set (if # applicable) acknowledging those directories that have already be flattened. # Similarly, dates for the revisions are set to acknowledge that these revisions # won't need to be reprocessed in case of failure. dir_acks = { sha1: DirectoryData( date=date, flat=self.cache["directory_flatten"].get(sha1) or False ) for sha1, date in self.cache["directory"]["data"].items() - if sha1 in self.cache["directory"]["added"] and date is not None + if self.cache["directory_flatten"].get(sha1) and date is not None } if dir_acks: while not self.storage.directory_add(dir_acks): statsd.increment( metric=BACKEND_OPERATIONS_METRIC, tags={"method": "flush_revision_content_retry_directory_ack"}, ) LOGGER.warning( "Unable to write directory dates to the storage. Retrying..." ) rev_dates = { sha1: RevisionData(date=date, origin=None) for sha1, date in self.cache["revision"]["data"].items() if sha1 in self.cache["revision"]["added"] and date is not None } if rev_dates: while not self.storage.revision_add(rev_dates): statsd.increment( metric=BACKEND_OPERATIONS_METRIC, tags={"method": "flush_revision_content_retry_revision_date"}, ) LOGGER.warning( "Unable to write revision dates to the storage. Retrying..." ) def content_add_to_directory( self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes ) -> None: self.cache["content_in_directory"].add( (blob.id, directory.id, path_normalize(os.path.join(prefix, blob.name))) ) def content_add_to_revision( self, revision: RevisionEntry, blob: FileEntry, prefix: bytes ) -> None: self.cache["content_in_revision"].add( (blob.id, revision.id, path_normalize(os.path.join(prefix, blob.name))) ) def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]: return self.storage.content_find_first(id) def content_find_all( self, id: Sha1Git, limit: Optional[int] = None ) -> Generator[ProvenanceResult, None, None]: yield from self.storage.content_find_all(id, limit=limit) def content_get_early_date(self, blob: FileEntry) -> Optional[datetime]: return self.get_dates("content", [blob.id]).get(blob.id) def content_get_early_dates( self, blobs: Iterable[FileEntry] ) -> Dict[Sha1Git, datetime]: return self.get_dates("content", [blob.id for blob in blobs]) def content_set_early_date(self, blob: FileEntry, date: datetime) -> None: self.cache["content"]["data"][blob.id] = date self.cache["content"]["added"].add(blob.id) def directory_add_to_revision( self, revision: RevisionEntry, directory: DirectoryEntry, path: bytes ) -> None: self.cache["directory_in_revision"].add( (directory.id, revision.id, path_normalize(path)) ) + def directory_already_flattenned(self, directory: DirectoryEntry) -> Optional[bool]: + cache = self.cache["directory_flatten"] + if directory.id not in cache: + cache.setdefault(directory.id, None) + ret = self.storage.directory_get([directory.id]) + if directory.id in ret: + dir = ret[directory.id] + cache[directory.id] = dir.flat + # date is kept to ensure we have it available when flushing + self.cache["directory"]["data"][directory.id] = dir.date + return cache.get(directory.id) + + def directory_flag_as_flattenned(self, directory: DirectoryEntry) -> None: + self.cache["directory_flatten"][directory.id] = True + def directory_get_date_in_isochrone_frontier( self, directory: DirectoryEntry ) -> Optional[datetime]: return self.get_dates("directory", [directory.id]).get(directory.id) def directory_get_dates_in_isochrone_frontier( self, dirs: Iterable[DirectoryEntry] ) -> Dict[Sha1Git, datetime]: return self.get_dates("directory", [directory.id for directory in dirs]) def directory_set_date_in_isochrone_frontier( self, directory: DirectoryEntry, date: datetime ) -> None: self.cache["directory"]["data"][directory.id] = date self.cache["directory"]["added"].add(directory.id) def get_dates( self, entity: Literal["content", "directory", "revision"], ids: Iterable[Sha1Git], ) -> Dict[Sha1Git, datetime]: cache = self.cache[entity] missing_ids = set(id for id in ids if id not in cache) if missing_ids: if entity == "content": cache["data"].update(self.storage.content_get(missing_ids)) elif entity == "directory": cache["data"].update( { id: dir.date for id, dir in self.storage.directory_get(missing_ids).items() } ) elif entity == "revision": cache["data"].update( { id: rev.date for id, rev in self.storage.revision_get(missing_ids).items() } ) dates: Dict[Sha1Git, datetime] = {} for sha1 in ids: date = cache["data"].setdefault(sha1, None) if date is not None: dates[sha1] = date return dates def open(self) -> None: self.storage.open() def origin_add(self, origin: OriginEntry) -> None: self.cache["origin"]["data"][origin.id] = origin.url self.cache["origin"]["added"].add(origin.id) def revision_add(self, revision: RevisionEntry) -> None: self.cache["revision"]["data"][revision.id] = revision.date self.cache["revision"]["added"].add(revision.id) def revision_add_before_revision( self, head: RevisionEntry, revision: RevisionEntry ) -> None: self.cache["revision_before_revision"].setdefault(revision.id, set()).add( head.id ) def revision_add_to_origin( self, origin: OriginEntry, revision: RevisionEntry ) -> None: self.cache["revision_in_origin"].add((revision.id, origin.id)) def revision_get_date(self, revision: RevisionEntry) -> Optional[datetime]: return self.get_dates("revision", [revision.id]).get(revision.id) def revision_get_preferred_origin( self, revision: RevisionEntry ) -> Optional[Sha1Git]: cache = self.cache["revision_origin"]["data"] if revision.id not in cache: ret = self.storage.revision_get([revision.id]) if revision.id in ret: origin = ret[revision.id].origin if origin is not None: cache[revision.id] = origin return cache.get(revision.id) def revision_in_history(self, revision: RevisionEntry) -> bool: return revision.id in self.cache["revision_before_revision"] or bool( self.storage.relation_get(RelationType.REV_BEFORE_REV, [revision.id]) ) def revision_set_preferred_origin( self, origin: OriginEntry, revision: RevisionEntry ) -> None: self.cache["revision_origin"]["data"][revision.id] = origin.id self.cache["revision_origin"]["added"].add(revision.id) def revision_visited(self, revision: RevisionEntry) -> bool: return revision.id in dict(self.cache["revision_in_origin"]) or bool( self.storage.relation_get(RelationType.REV_IN_ORG, [revision.id]) ) diff --git a/swh/provenance/revision.py b/swh/provenance/revision.py index a6c81ec..2b578e1 100644 --- a/swh/provenance/revision.py +++ b/swh/provenance/revision.py @@ -1,246 +1,224 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime, timezone -import os from typing import Generator, Iterable, Iterator, List, Optional, Tuple from swh.core.statsd import statsd from swh.model.model import Sha1Git from .archive import ArchiveInterface +from .directory import directory_flatten from .graph import IsochroneNode, build_isochrone_graph from .interface import ProvenanceInterface from .model import DirectoryEntry, RevisionEntry REVISION_DURATION_METRIC = "swh_provenance_revision_content_layer_duration_seconds" class CSVRevisionIterator: """Iterator over revisions typically present in the given CSV file. The input is an iterator that produces 3 elements per row: (id, date, root) where: - id: is the id (sha1_git) of the revision - date: is the author date - root: sha1 of the directory """ def __init__( self, revisions: Iterable[Tuple[Sha1Git, datetime, Sha1Git]], limit: Optional[int] = None, ) -> None: self.revisions: Iterator[Tuple[Sha1Git, datetime, Sha1Git]] if limit is not None: from itertools import islice self.revisions = islice(revisions, limit) else: self.revisions = iter(revisions) def __iter__(self) -> Generator[RevisionEntry, None, None]: for id, date, root in self.revisions: if date.tzinfo is None: date = date.replace(tzinfo=timezone.utc) yield RevisionEntry(id, date=date, root=root) @statsd.timed(metric=REVISION_DURATION_METRIC, tags={"method": "main"}) def revision_add( provenance: ProvenanceInterface, archive: ArchiveInterface, revisions: List[RevisionEntry], trackall: bool = True, lower: bool = True, mindepth: int = 1, minsize: int = 0, commit: bool = True, ) -> None: for revision in revisions: assert revision.date is not None assert revision.root is not None # Processed content starting from the revision's root directory. date = provenance.revision_get_date(revision) if date is None or revision.date < date: graph = build_isochrone_graph( provenance, archive, revision, DirectoryEntry(revision.root), minsize=minsize, ) revision_process_content( provenance, archive, revision, graph, trackall=trackall, lower=lower, mindepth=mindepth, minsize=minsize, ) if commit: provenance.flush() @statsd.timed(metric=REVISION_DURATION_METRIC, tags={"method": "process_content"}) def revision_process_content( provenance: ProvenanceInterface, archive: ArchiveInterface, revision: RevisionEntry, graph: IsochroneNode, trackall: bool = True, lower: bool = True, mindepth: int = 1, minsize: int = 0, ) -> None: assert revision.date is not None provenance.revision_add(revision) stack = [graph] while stack: current = stack.pop() if current.dbdate is not None: assert current.dbdate <= revision.date if trackall: # Current directory is an outer isochrone frontier for a previously # processed revision. It should be reused as is. provenance.directory_add_to_revision( revision, current.entry, current.path ) else: assert current.maxdate is not None # Current directory is not an outer isochrone frontier for any previous # revision. It might be eligible for this one. if is_new_frontier( current, revision=revision, trackall=trackall, lower=lower, mindepth=mindepth, ): # Outer frontier should be moved to current position in the isochrone # graph. This is the first time this directory is found in the isochrone # frontier. provenance.directory_set_date_in_isochrone_frontier( current.entry, current.maxdate ) if trackall: provenance.directory_add_to_revision( revision, current.entry, current.path ) - flatten_directory( + directory_flatten( provenance, archive, current.entry, minsize=minsize ) else: # If current node is an invalidated frontier, update its date for future # revisions to get the proper value. if current.invalid: provenance.directory_set_date_in_isochrone_frontier( current.entry, current.maxdate ) # No point moving the frontier here. Either there are no files or they # are being seen for the first time here. Add all blobs to current # revision updating date if necessary, and recursively analyse # subdirectories as candidates to the outer frontier. for blob in current.entry.files: date = provenance.content_get_early_date(blob) if date is None or revision.date < date: provenance.content_set_early_date(blob, revision.date) provenance.content_add_to_revision(revision, blob, current.path) for child in current.children: stack.append(child) -@statsd.timed(metric=REVISION_DURATION_METRIC, tags={"method": "flatten_directory"}) -def flatten_directory( - provenance: ProvenanceInterface, - archive: ArchiveInterface, - directory: DirectoryEntry, - minsize: int = 0, -) -> None: - """Recursively retrieve all the files of 'directory' and insert them in the - 'provenance' database in the 'content_to_directory' table. - """ - stack = [(directory, b"")] - while stack: - current, prefix = stack.pop() - current.retrieve_children(archive, minsize=minsize) - for f_child in current.files: - # Add content to the directory with the computed prefix. - provenance.content_add_to_directory(directory, f_child, prefix) - for d_child in current.dirs: - # Recursively walk the child directory. - stack.append((d_child, os.path.join(prefix, d_child.name))) - - def is_new_frontier( node: IsochroneNode, revision: RevisionEntry, trackall: bool = True, lower: bool = True, mindepth: int = 1, ) -> bool: assert node.maxdate is not None # for mypy assert revision.date is not None # idem if trackall: # The only real condition for a directory to be a frontier is that its content # is already known and its maxdate is less (or equal) than current revision's # date. Checking mindepth is meant to skip root directories (or any arbitrary # depth) to improve the result. The option lower tries to maximize the reuse # rate of previously defined frontiers by keeping them low in the directory # tree. return ( node.known and node.maxdate <= revision.date # all content is earlier than revision and node.depth >= mindepth # current node is deeper than the min allowed depth and (has_blobs(node) if lower else True) # there is at least one blob in it ) else: # If we are only tracking first occurrences, we want to ensure that all first # occurrences end up in the content_early_in_rev relation. Thus, we force for # every blob outside a frontier to have an strictly earlier date. return ( node.maxdate < revision.date # all content is earlier than revision and node.depth >= mindepth # deeper than the min allowed depth and (has_blobs(node) if lower else True) # there is at least one blob ) def has_blobs(node: IsochroneNode) -> bool: # We may want to look for files in different ways to decide whether to define a # frontier or not: # 1. Only files in current node: return any(node.entry.files) # 2. Files anywhere in the isochrone graph # stack = [node] # while stack: # current = stack.pop() # if any( # map(lambda child: isinstance(child.entry, FileEntry), current.children)): # return True # else: # # All children are directory entries. # stack.extend(current.children) # return False # 3. Files in the intermediate directories between current node and any previously # defined frontier: # TODO: complete this case! # return any( # map(lambda child: isinstance(child.entry, FileEntry), node.children) # ) or all( # map( # lambda child: ( # not (isinstance(child.entry, DirectoryEntry) and child.date is None) # ) # or has_blobs(child), # node.children, # ) # ) diff --git a/swh/provenance/tests/test_cli.py b/swh/provenance/tests/test_cli.py index cceb30a..6ed98a2 100644 --- a/swh/provenance/tests/test_cli.py +++ b/swh/provenance/tests/test_cli.py @@ -1,104 +1,105 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Set from _pytest.monkeypatch import MonkeyPatch from click.testing import CliRunner import psycopg2.extensions import pytest from swh.core.cli import swh as swhmain import swh.core.cli.db # noqa ; ensure cli is loaded from swh.core.db import BaseDb import swh.provenance.cli # noqa ; ensure cli is loaded def test_cli_swh_db_help() -> None: # swhmain.add_command(provenance_cli) result = CliRunner().invoke(swhmain, ["provenance", "-h"]) assert result.exit_code == 0 assert "Commands:" in result.output commands = result.output.split("Commands:")[1] for command in ( "find-all", "find-first", + "iter-frontiers", "iter-origins", "iter-revisions", ): assert f" {command} " in commands TABLES = { "dbflavor", "dbversion", "content", "content_in_revision", "content_in_directory", "directory", "directory_in_revision", "location", "origin", "revision", "revision_before_revision", "revision_in_origin", } @pytest.mark.parametrize( "flavor, dbtables", (("with-path", TABLES | {"location"}), ("without-path", TABLES)) ) def test_cli_db_create_and_init_db_with_flavor( monkeypatch: MonkeyPatch, postgresql: psycopg2.extensions.connection, flavor: str, dbtables: Set[str], ) -> None: """Test that 'swh db init provenance' works with flavors for both with-path and without-path flavors""" dbname = f"{flavor}-db" # DB creation using 'swh db create' db_params = postgresql.get_dsn_parameters() monkeypatch.setenv("PGHOST", db_params["host"]) monkeypatch.setenv("PGUSER", db_params["user"]) monkeypatch.setenv("PGPORT", db_params["port"]) result = CliRunner().invoke(swhmain, ["db", "create", "-d", dbname, "provenance"]) assert result.exit_code == 0, result.output # DB init using 'swh db init' result = CliRunner().invoke( swhmain, ["db", "init", "-d", dbname, "--flavor", flavor, "provenance"] ) assert result.exit_code == 0, result.output assert f"(flavor {flavor})" in result.output db_params["dbname"] = dbname cnx = BaseDb.connect(**db_params).conn # check the DB looks OK (check for db_flavor and expected tables) with cnx.cursor() as cur: cur.execute("select swh_get_dbflavor()") assert cur.fetchone() == (flavor,) cur.execute( "select table_name from information_schema.tables " "where table_schema = 'public' " f"and table_catalog = '{dbname}'" ) tables = set(x for (x,) in cur.fetchall()) assert tables == dbtables def test_cli_init_db_default_flavor(postgresql: psycopg2.extensions.connection) -> None: "Test that 'swh db init provenance' defaults to a with-path flavored DB" dbname = postgresql.dsn result = CliRunner().invoke(swhmain, ["db", "init", "-d", dbname, "provenance"]) assert result.exit_code == 0, result.output with postgresql.cursor() as cur: cur.execute("select swh_get_dbflavor()") assert cur.fetchone() == ("with-path",) diff --git a/swh/provenance/tests/test_directory_flatten.py b/swh/provenance/tests/test_directory_flatten.py new file mode 100644 index 0000000..82f7257 --- /dev/null +++ b/swh/provenance/tests/test_directory_flatten.py @@ -0,0 +1,72 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + + +from datetime import datetime, timezone + +from swh.model.hashutil import hash_to_bytes +from swh.provenance.archive import ArchiveInterface +from swh.provenance.directory import directory_add +from swh.provenance.interface import ( + DirectoryData, + ProvenanceInterface, + RelationData, + RelationType, +) +from swh.provenance.model import DirectoryEntry, FileEntry +from swh.provenance.tests.conftest import fill_storage, load_repo_data + + +def test_directory_add( + provenance: ProvenanceInterface, + archive: ArchiveInterface, +) -> None: + # read data/README.md for more details on how these datasets are generated + data = load_repo_data("cmdbts2") + fill_storage(archive.storage, data) + + # just take a directory that is known to exists in cmdbts2 + directory = DirectoryEntry( + id=hash_to_bytes("48007c961cc734d1f63886d0413a6dc605e3e2ea") + ) + content1 = FileEntry( + id=hash_to_bytes("20329687bb9c1231a7e05afe86160343ad49b494"), name=b"a" + ) + content2 = FileEntry( + id=hash_to_bytes("50e9cdb03f9719261dd39d7f2920b906db3711a3"), name=b"b" + ) + date = datetime.fromtimestamp(1000000010, timezone.utc) + + # directory_add and the internal directory_flatten require the directory and its + # content to be known by the provenance object. Otherwise, they do nothing + provenance.directory_set_date_in_isochrone_frontier(directory, date) + provenance.content_set_early_date(content1, date) + provenance.content_set_early_date(content2, date) + provenance.flush() + assert provenance.storage.directory_get([directory.id]) == { + directory.id: DirectoryData(date=date, flat=False) + } + assert provenance.storage.content_get([content1.id, content2.id]) == { + content1.id: date, + content2.id: date, + } + + # this query forces the directory date to be retrieved from the storage and cached + # (otherwise, the flush below won't update the directory flatten flag) + flattenned = provenance.directory_already_flattenned(directory) + assert flattenned is not None and not flattenned + + # flatten the directory and check the expected result + directory_add(provenance, archive, [directory]) + assert provenance.storage.directory_get([directory.id]) == { + directory.id: DirectoryData(date=date, flat=True) + } + assert provenance.storage.relation_get_all(RelationType.CNT_IN_DIR) == { + content1.id: { + RelationData(dst=directory.id, path=b"a"), + RelationData(dst=directory.id, path=b"C/a"), + }, + content2.id: {RelationData(dst=directory.id, path=b"C/b")}, + } diff --git a/swh/provenance/tests/test_directory_iterator.py b/swh/provenance/tests/test_directory_iterator.py new file mode 100644 index 0000000..f86c8c4 --- /dev/null +++ b/swh/provenance/tests/test_directory_iterator.py @@ -0,0 +1,29 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import pytest + +from swh.provenance.directory import CSVDirectoryIterator +from swh.provenance.tests.conftest import fill_storage, load_repo_data +from swh.storage.interface import StorageInterface + + +@pytest.mark.parametrize( + "repo", + ( + "cmdbts2", + "out-of-order", + ), +) +def test_revision_iterator(swh_storage: StorageInterface, repo: str) -> None: + """Test CSVDirectoryIterator""" + data = load_repo_data(repo) + fill_storage(swh_storage, data) + + directories_ids = [dir["id"] for dir in data["directory"]] + directories = list(CSVDirectoryIterator(directories_ids)) + + assert directories + assert len(directories) == len(data["directory"]) diff --git a/swh/provenance/tests/test_revision_content_layer.py b/swh/provenance/tests/test_revision_content_layer.py index b7bc070..6b81a37 100644 --- a/swh/provenance/tests/test_revision_content_layer.py +++ b/swh/provenance/tests/test_revision_content_layer.py @@ -1,456 +1,453 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import re from typing import Any, Dict, Iterable, Iterator, List, Optional, Set, Tuple import pytest from typing_extensions import TypedDict from swh.model.hashutil import hash_to_bytes from swh.model.model import Sha1Git from swh.provenance.archive import ArchiveInterface from swh.provenance.interface import EntityType, ProvenanceInterface, RelationType from swh.provenance.model import RevisionEntry from swh.provenance.revision import revision_add from swh.provenance.tests.conftest import ( fill_storage, get_datafile, load_repo_data, ts2dt, ) class SynthRelation(TypedDict): prefix: Optional[str] path: str src: Sha1Git dst: Sha1Git rel_ts: float class SynthRevision(TypedDict): sha1: Sha1Git date: float msg: str R_C: List[SynthRelation] R_D: List[SynthRelation] D_C: List[SynthRelation] def synthetic_revision_content_result(filename: str) -> Iterator[SynthRevision]: """Generates dict representations of synthetic revisions found in the synthetic file (from the data/ directory) given as argument of the generator. Generated SynthRevision (typed dict) with the following elements: "sha1": (Sha1Git) sha1 of the revision, "date": (float) timestamp of the revision, "msg": (str) commit message of the revision, "R_C": (list) new R---C relations added by this revision "R_D": (list) new R-D relations added by this revision "D_C": (list) new D-C relations added by this revision Each relation above is a SynthRelation typed dict with: "path": (str) location "src": (Sha1Git) sha1 of the source of the relation "dst": (Sha1Git) sha1 of the destination of the relation "rel_ts": (float) timestamp of the target of the relation (related to the timestamp of the revision) """ with open(get_datafile(filename), "r") as fobj: yield from _parse_synthetic_revision_content_file(fobj) def _parse_synthetic_revision_content_file( fobj: Iterable[str], ) -> Iterator[SynthRevision]: """Read a 'synthetic' file and generate a dict representation of the synthetic revision for each revision listed in the synthetic file. """ regs = [ "(?PR[0-9]{2,4})?", "(?P[^| ]*)", "([+] )?(?P[^| +]*?)[/]?", "(?P[RDC]) (?P[0-9a-f]{40})", "(?P-?[0-9]+(.[0-9]+)?)", ] regex = re.compile("^ *" + r" *[|] *".join(regs) + r" *(#.*)?$") current_rev: List[dict] = [] for m in (regex.match(line) for line in fobj): if m: d = m.groupdict() if d["revname"]: if current_rev: yield _mk_synth_rev(current_rev) current_rev.clear() current_rev.append(d) if current_rev: yield _mk_synth_rev(current_rev) def _mk_synth_rev(synth_rev: List[Dict[str, str]]) -> SynthRevision: assert synth_rev[0]["type"] == "R" rev = SynthRevision( sha1=hash_to_bytes(synth_rev[0]["sha1"]), date=float(synth_rev[0]["ts"]), msg=synth_rev[0]["revname"], R_C=[], R_D=[], D_C=[], ) current_path = None # path of the last R-D relation we parsed, used a prefix for next D-C # relations for row in synth_rev[1:]: if row["reltype"] == "R---C": assert row["type"] == "C" rev["R_C"].append( SynthRelation( prefix=None, path=row["path"], src=rev["sha1"], dst=hash_to_bytes(row["sha1"]), rel_ts=float(row["ts"]), ) ) current_path = None elif row["reltype"] == "R-D": assert row["type"] == "D" rev["R_D"].append( SynthRelation( prefix=None, path=row["path"], src=rev["sha1"], dst=hash_to_bytes(row["sha1"]), rel_ts=float(row["ts"]), ) ) current_path = row["path"] elif row["reltype"] == "D-C": assert row["type"] == "C" rev["D_C"].append( SynthRelation( prefix=current_path, path=row["path"], src=rev["R_D"][-1]["dst"], dst=hash_to_bytes(row["sha1"]), rel_ts=float(row["ts"]), ) ) return rev @pytest.mark.parametrize( "repo, lower, mindepth", ( ("cmdbts2", True, 1), ("cmdbts2", False, 1), ("cmdbts2", True, 2), ("cmdbts2", False, 2), ("out-of-order", True, 1), ), ) def test_revision_content_result( provenance: ProvenanceInterface, archive: ArchiveInterface, repo: str, lower: bool, mindepth: int, ) -> None: # read data/README.md for more details on how these datasets are generated data = load_repo_data(repo) fill_storage(archive.storage, data) syntheticfile = get_datafile( f"synthetic_{repo}_{'lower' if lower else 'upper'}_{mindepth}.txt" ) revisions = {rev["id"]: rev for rev in data["revision"]} rows: Dict[str, Set[Any]] = { "content": set(), "content_in_directory": set(), "content_in_revision": set(), "directory": set(), "directory_in_revision": set(), "location": set(), "revision": set(), } def maybe_path(path: str) -> Optional[bytes]: if provenance.storage.with_path(): return path.encode("utf-8") return None for synth_rev in synthetic_revision_content_result(syntheticfile): revision = revisions[synth_rev["sha1"]] entry = RevisionEntry( id=revision["id"], date=ts2dt(revision["date"]), root=revision["directory"], ) revision_add(provenance, archive, [entry], lower=lower, mindepth=mindepth) # each "entry" in the synth file is one new revision rows["revision"].add(synth_rev["sha1"]) assert rows["revision"] == provenance.storage.entity_get_all( EntityType.REVISION ), synth_rev["msg"] # check the timestamp of the revision rev_ts = synth_rev["date"] rev_data = provenance.storage.revision_get([synth_rev["sha1"]])[ synth_rev["sha1"] ] assert ( rev_data.date is not None and rev_ts == rev_data.date.timestamp() ), synth_rev["msg"] # this revision might have added new content objects rows["content"] |= set(x["dst"] for x in synth_rev["R_C"]) rows["content"] |= set(x["dst"] for x in synth_rev["D_C"]) assert rows["content"] == provenance.storage.entity_get_all( EntityType.CONTENT ), synth_rev["msg"] # check for R-C (direct) entries # these are added directly in the content_early_in_rev table rows["content_in_revision"] |= set( (x["dst"], x["src"], maybe_path(x["path"])) for x in synth_rev["R_C"] ) assert rows["content_in_revision"] == { (src, rel.dst, rel.path) for src, rels in provenance.storage.relation_get_all( RelationType.CNT_EARLY_IN_REV ).items() for rel in rels }, synth_rev["msg"] # check timestamps for rc in synth_rev["R_C"]: assert ( rev_ts + rc["rel_ts"] == provenance.storage.content_get([rc["dst"]])[rc["dst"]].timestamp() ), synth_rev["msg"] # check directories # each directory stored in the provenance index is an entry # in the "directory" table... rows["directory"] |= set(x["dst"] for x in synth_rev["R_D"]) assert rows["directory"] == provenance.storage.entity_get_all( EntityType.DIRECTORY ), synth_rev["msg"] # ... + a number of rows in the "directory_in_rev" table... # check for R-D entries rows["directory_in_revision"] |= set( (x["dst"], x["src"], maybe_path(x["path"])) for x in synth_rev["R_D"] ) assert rows["directory_in_revision"] == { (src, rel.dst, rel.path) for src, rels in provenance.storage.relation_get_all( RelationType.DIR_IN_REV ).items() for rel in rels }, synth_rev["msg"] # check timestamps for rd in synth_rev["R_D"]: - assert ( - rev_ts + rd["rel_ts"] - == provenance.storage.directory_get([rd["dst"]])[ - rd["dst"] - ].date.timestamp() - ), synth_rev["msg"] + dir_data = provenance.storage.directory_get([rd["dst"]])[rd["dst"]] + assert rev_ts + rd["rel_ts"] == dir_data.date.timestamp(), synth_rev["msg"] + assert dir_data.flat, synth_rev["msg"] # ... + a number of rows in the "content_in_dir" table # for content of the directory. # check for D-C entries rows["content_in_directory"] |= set( (x["dst"], x["src"], maybe_path(x["path"])) for x in synth_rev["D_C"] ) assert rows["content_in_directory"] == { (src, rel.dst, rel.path) for src, rels in provenance.storage.relation_get_all( RelationType.CNT_IN_DIR ).items() for rel in rels }, synth_rev["msg"] # check timestamps for dc in synth_rev["D_C"]: assert ( rev_ts + dc["rel_ts"] == provenance.storage.content_get([dc["dst"]])[dc["dst"]].timestamp() ), synth_rev["msg"] if provenance.storage.with_path(): # check for location entries rows["location"] |= set(x["path"].encode() for x in synth_rev["R_C"]) rows["location"] |= set(x["path"].encode() for x in synth_rev["D_C"]) rows["location"] |= set(x["path"].encode() for x in synth_rev["R_D"]) assert rows["location"] == provenance.storage.location_get_all(), synth_rev[ "msg" ] @pytest.mark.parametrize( "repo, lower, mindepth", ( ("cmdbts2", True, 1), ("cmdbts2", False, 1), ("cmdbts2", True, 2), ("cmdbts2", False, 2), ("out-of-order", True, 1), ), ) @pytest.mark.parametrize("batch", (True, False)) def test_provenance_heuristics_content_find_all( provenance: ProvenanceInterface, archive: ArchiveInterface, repo: str, lower: bool, mindepth: int, batch: bool, ) -> None: # read data/README.md for more details on how these datasets are generated data = load_repo_data(repo) fill_storage(archive.storage, data) revisions = [ RevisionEntry( id=revision["id"], date=ts2dt(revision["date"]), root=revision["directory"], ) for revision in data["revision"] ] def maybe_path(path: str) -> str: if provenance.storage.with_path(): return path return "" if batch: revision_add(provenance, archive, revisions, lower=lower, mindepth=mindepth) else: for revision in revisions: revision_add( provenance, archive, [revision], lower=lower, mindepth=mindepth ) syntheticfile = get_datafile( f"synthetic_{repo}_{'lower' if lower else 'upper'}_{mindepth}.txt" ) expected_occurrences: Dict[str, List[Tuple[str, float, Optional[str], str]]] = {} for synth_rev in synthetic_revision_content_result(syntheticfile): rev_id = synth_rev["sha1"].hex() rev_ts = synth_rev["date"] for rc in synth_rev["R_C"]: expected_occurrences.setdefault(rc["dst"].hex(), []).append( (rev_id, rev_ts, None, maybe_path(rc["path"])) ) for dc in synth_rev["D_C"]: assert dc["prefix"] is not None # to please mypy expected_occurrences.setdefault(dc["dst"].hex(), []).append( (rev_id, rev_ts, None, maybe_path(dc["prefix"] + "/" + dc["path"])) ) for content_id, results in expected_occurrences.items(): expected = [(content_id, *result) for result in results] db_occurrences = [ ( occur.content.hex(), occur.revision.hex(), occur.date.timestamp(), occur.origin, occur.path.decode(), ) for occur in provenance.content_find_all(hash_to_bytes(content_id)) ] if provenance.storage.with_path(): # this is not true if the db stores no path, because a same content # that appears several times in a given revision may be reported # only once by content_find_all() assert len(db_occurrences) == len(expected) assert set(db_occurrences) == set(expected) @pytest.mark.parametrize( "repo, lower, mindepth", ( ("cmdbts2", True, 1), ("cmdbts2", False, 1), ("cmdbts2", True, 2), ("cmdbts2", False, 2), ("out-of-order", True, 1), ), ) @pytest.mark.parametrize("batch", (True, False)) def test_provenance_heuristics_content_find_first( provenance: ProvenanceInterface, archive: ArchiveInterface, repo: str, lower: bool, mindepth: int, batch: bool, ) -> None: # read data/README.md for more details on how these datasets are generated data = load_repo_data(repo) fill_storage(archive.storage, data) revisions = [ RevisionEntry( id=revision["id"], date=ts2dt(revision["date"]), root=revision["directory"], ) for revision in data["revision"] ] if batch: revision_add(provenance, archive, revisions, lower=lower, mindepth=mindepth) else: for revision in revisions: revision_add( provenance, archive, [revision], lower=lower, mindepth=mindepth ) syntheticfile = get_datafile( f"synthetic_{repo}_{'lower' if lower else 'upper'}_{mindepth}.txt" ) expected_first: Dict[str, Tuple[str, float, List[str]]] = {} # dict of tuples (blob_id, rev_id, [path, ...]) the third element for path # is a list because a content can be added at several places in a single # revision, in which case the result of content_find_first() is one of # those path, but we have no guarantee which one it will return. for synth_rev in synthetic_revision_content_result(syntheticfile): rev_id = synth_rev["sha1"].hex() rev_ts = synth_rev["date"] for rc in synth_rev["R_C"]: sha1 = rc["dst"].hex() if sha1 not in expected_first: assert rc["rel_ts"] == 0 expected_first[sha1] = (rev_id, rev_ts, [rc["path"]]) else: if rev_ts == expected_first[sha1][1]: expected_first[sha1][2].append(rc["path"]) elif rev_ts < expected_first[sha1][1]: expected_first[sha1] = (rev_id, rev_ts, [rc["path"]]) for dc in synth_rev["D_C"]: sha1 = rc["dst"].hex() assert sha1 in expected_first # nothing to do there, this content cannot be a "first seen file" for content_id, (rev_id, ts, paths) in expected_first.items(): occur = provenance.content_find_first(hash_to_bytes(content_id)) assert occur is not None assert occur.content.hex() == content_id assert occur.revision.hex() == rev_id assert occur.date.timestamp() == ts assert occur.origin is None if provenance.storage.with_path(): assert occur.path.decode() in paths