diff --git a/swh/provenance/cli.py b/swh/provenance/cli.py index b5b3e0b..cc17f0f 100644 --- a/swh/provenance/cli.py +++ b/swh/provenance/cli.py @@ -1,215 +1,213 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information # WARNING: do not import unnecessary things here to keep cli startup time under # control import os from typing import Any, Dict, Optional import click import yaml from swh.core import config from swh.core.cli import CONTEXT_SETTINGS from swh.core.cli import swh as swh_cli_group from swh.model.hashutil import hash_to_bytes, hash_to_hex # All generic config code should reside in swh.core.config CONFIG_ENVVAR = "SWH_CONFIG_FILE" DEFAULT_CONFIG_PATH = os.path.join(click.get_app_dir("swh"), "global.yml") DEFAULT_PATH = os.environ.get(CONFIG_ENVVAR, DEFAULT_CONFIG_PATH) DEFAULT_CONFIG: Dict[str, Any] = { "archive": { "cls": "api", "storage": { "cls": "remote", "url": "http://uffizi.internal.softwareheritage.org:5002", } # "cls": "direct", # "db": { # "host": "db.internal.softwareheritage.org", # "dbname": "softwareheritage", # "user": "guest" # } }, "provenance": {"cls": "local", "db": {"host": "localhost", "dbname": "provenance"}}, } CONFIG_FILE_HELP = f"""Configuration file: \b The CLI option or the environment variable will fail if invalid. CLI option is checked first. Then, environment variable {CONFIG_ENVVAR} is checked. Then, if cannot load the default path, a set of default values are used. Default config path is {DEFAULT_CONFIG_PATH}. Default config values are: \b {yaml.dump(DEFAULT_CONFIG)}""" PROVENANCE_HELP = f"""Software Heritage Scanner tools. {CONFIG_FILE_HELP}""" @swh_cli_group.group( name="provenance", context_settings=CONTEXT_SETTINGS, help=PROVENANCE_HELP ) @click.option( "-C", "--config-file", default=None, type=click.Path(exists=False, dir_okay=False, path_type=str), help="""YAML configuration file.""", ) @click.option( "-P", "--profile", default=None, type=click.Path(exists=False, dir_okay=False, path_type=str), help="""Enable profiling to specified file.""", ) @click.pass_context def cli(ctx, config_file: Optional[str], profile: str): if config_file is None and config.config_exists(DEFAULT_PATH): config_file = DEFAULT_PATH if config_file is None: conf = DEFAULT_CONFIG else: # read_raw_config do not fail on ENOENT if not config.config_exists(config_file): raise FileNotFoundError(config_file) conf = config.read_raw_config(config.config_basepath(config_file)) conf = config.merge_configs(DEFAULT_CONFIG, conf) ctx.ensure_object(dict) ctx.obj["config"] = conf if profile: import atexit import cProfile print("Profiling...") pr = cProfile.Profile() pr.enable() def exit(): pr.disable() pr.dump_stats(profile) atexit.register(exit) @cli.command(name="create", deprecated=True) @click.option("--maintenance-db", default=None) @click.option("--drop/--no-drop", "drop_db", default=False) @click.pass_context def create(ctx, maintenance_db, drop_db): """Deprecated, please use: swh db create provenance and swh db init provenance instead. """ @cli.command(name="iter-revisions") @click.argument("filename") @click.option("-a", "--track-all", default=True, type=bool) @click.option("-l", "--limit", type=int) @click.option("-m", "--min-depth", default=1, type=int) @click.option("-r", "--reuse", default=True, type=bool) @click.pass_context def iter_revisions(ctx, filename, track_all, limit, min_depth, reuse): # TODO: add file size filtering """Process a provided list of revisions.""" from . import get_archive, get_provenance - from .provenance import revision_add - from .revision import CSVRevisionIterator + from .revision import CSVRevisionIterator, revision_add archive = get_archive(**ctx.obj["config"]["archive"]) provenance = get_provenance(**ctx.obj["config"]["provenance"]) revisions_provider = ( line.strip().split(",") for line in open(filename, "r") if line.strip() ) revisions = CSVRevisionIterator(revisions_provider, limit=limit) for revision in revisions: revision_add( provenance, archive, [revision], trackall=track_all, lower=reuse, mindepth=min_depth, ) @cli.command(name="iter-origins") @click.argument("filename") @click.option("-l", "--limit", type=int) @click.pass_context def iter_origins(ctx, filename, limit): """Process a provided list of origins.""" from . import get_archive, get_provenance - from .origin import CSVOriginIterator - from .provenance import origin_add + from .origin import CSVOriginIterator, origin_add archive = get_archive(**ctx.obj["config"]["archive"]) provenance = get_provenance(**ctx.obj["config"]["provenance"]) origins_provider = ( line.strip().split(",") for line in open(filename, "r") if line.strip() ) origins = CSVOriginIterator(origins_provider, limit=limit) for origin in origins: origin_add(provenance, archive, [origin]) @cli.command(name="find-first") @click.argument("swhid") @click.pass_context def find_first(ctx, swhid): """Find first occurrence of the requested blob.""" from . import get_provenance provenance = get_provenance(**ctx.obj["config"]["provenance"]) # TODO: return a dictionary with proper keys for each field row = provenance.content_find_first(hash_to_bytes(swhid)) if row is not None: print( "swh:1:cnt:{cnt}, swh:1:rev:{rev}, {date}, {path}".format( cnt=hash_to_hex(row[0]), rev=hash_to_hex(row[1]), date=row[2], path=os.fsdecode(row[3]), ) ) else: print(f"Cannot find a content with the id {swhid}") @cli.command(name="find-all") @click.argument("swhid") @click.option("-l", "--limit", type=int) @click.pass_context def find_all(ctx, swhid, limit): """Find all occurrences of the requested blob.""" - from swh.provenance import get_provenance + from . import get_provenance provenance = get_provenance(**ctx.obj["config"]["provenance"]) # TODO: return a dictionary with proper keys for each field for row in provenance.content_find_all(hash_to_bytes(swhid), limit=limit): print( "swh:1:cnt:{cnt}, swh:1:rev:{rev}, {date}, {path}".format( cnt=hash_to_hex(row[0]), rev=hash_to_hex(row[1]), date=row[2], path=os.fsdecode(row[3]), ) ) diff --git a/swh/provenance/graph.py b/swh/provenance/graph.py new file mode 100644 index 0000000..1615429 --- /dev/null +++ b/swh/provenance/graph.py @@ -0,0 +1,223 @@ +from collections import Counter +from datetime import datetime, timezone +import logging +import os +from typing import Dict, List, Optional + +from swh.model.hashutil import hash_to_hex + +from .archive import ArchiveInterface +from .model import DirectoryEntry, RevisionEntry +from .provenance import ProvenanceInterface + +UTCMIN = datetime.min.replace(tzinfo=timezone.utc) + + +class IsochroneNode: + def __init__( + self, + entry: DirectoryEntry, + dbdate: Optional[datetime] = None, + depth: int = 0, + prefix: bytes = b"", + ): + self.entry = entry + self.depth = depth + + # dbdate is the maxdate for this node that comes from the DB + self._dbdate: Optional[datetime] = dbdate + + # maxdate is set by the maxdate computation algorithm + self.maxdate: Optional[datetime] = None + + # known is True if this node is already known in the db; either because + # the current directory actually exists in the database, or because all + # the content of the current directory is known (subdirectories and files) + self.known = self.dbdate is not None + self.invalid = False + self.path = os.path.join(prefix, self.entry.name) if prefix else self.entry.name + self.children: List[IsochroneNode] = [] + + @property + def dbdate(self): + # use a property to make this attribute (mostly) read-only + return self._dbdate + + def invalidate(self): + self._dbdate = None + self.maxdate = None + self.known = False + self.invalid = True + + def add_directory( + self, child: DirectoryEntry, date: Optional[datetime] = None + ) -> "IsochroneNode": + # we should not be processing this node (ie add subdirectories or + # files) if it's actually known by the provenance DB + assert self.dbdate is None + node = IsochroneNode(child, dbdate=date, depth=self.depth + 1, prefix=self.path) + self.children.append(node) + return node + + def __str__(self): + return ( + f"<{self.entry}: dbdate={self.dbdate}, maxdate={self.maxdate}, " + f"known={self.known}, invalid={self.invalid}, path={self.path}, " + f"children=[{', '.join(str(child) for child in self.children)}]>" + ) + + def __eq__(self, other): + return ( + isinstance(other, IsochroneNode) + and ( + self.entry, + self.depth, + self._dbdate, + self.maxdate, + self.known, + self.invalid, + self.path, + ) + == ( + other.entry, + other.depth, + other._dbdate, + other.maxdate, + other.known, + other.invalid, + other.path, + ) + and Counter(self.children) == Counter(other.children) + ) + + def __hash__(self): + return hash( + ( + self.entry, + self.depth, + self._dbdate, + self.maxdate, + self.known, + self.invalid, + self.path, + ) + ) + + +def build_isochrone_graph( + archive: ArchiveInterface, + provenance: ProvenanceInterface, + revision: RevisionEntry, + directory: DirectoryEntry, +) -> IsochroneNode: + assert revision.date is not None + assert revision.root == directory.id + + # this function process a revision in 2 steps: + # + # 1. build the tree structure of IsochroneNode objects (one INode per + # directory under the root directory of the revision but not following + # known subdirectories), and gather the dates from the DB for already + # known objects; for files, just keep all the dates in a global 'fdates' + # dict; note that in this step, we will only recurse the directories + # that are not already known. + # + # 2. compute the maxdate for each node of the tree that was not found in the DB. + + # Build the nodes structure + root_date = provenance.directory_get_date_in_isochrone_frontier(directory) + root = IsochroneNode(directory, dbdate=root_date) + stack = [root] + logging.debug( + f"Recursively creating graph for revision {hash_to_hex(revision.id)}..." + ) + fdates: Dict[bytes, datetime] = {} # map {file_id: date} + while stack: + current = stack.pop() + if current.dbdate is None or current.dbdate > revision.date: + # If current directory has an associated date in the isochrone frontier that + # is greater or equal to the current revision's one, it should be ignored as + # the revision is being processed out of order. + if current.dbdate is not None and current.dbdate > revision.date: + logging.debug( + f"Invalidating frontier on {hash_to_hex(current.entry.id)}" + f" (date {current.dbdate})" + f" when processing revision {hash_to_hex(revision.id)}" + f" (date {revision.date})" + ) + current.invalidate() + + # Pre-query all known dates for directories in the current directory + # for the provenance object to have them cached and (potentially) improve + # performance. + current.entry.retrieve_children(archive) + ddates = provenance.directory_get_dates_in_isochrone_frontier( + current.entry.dirs + ) + for dir in current.entry.dirs: + # Recursively analyse subdirectory nodes + node = current.add_directory(dir, date=ddates.get(dir.id, None)) + stack.append(node) + + fdates.update(provenance.content_get_early_dates(current.entry.files)) + + logging.debug( + f"Isochrone graph for revision {hash_to_hex(revision.id)} successfully created!" + ) + # Precalculate max known date for each node in the graph (only directory nodes are + # pushed to the stack). + logging.debug(f"Computing maxdates for revision {hash_to_hex(revision.id)}...") + stack = [root] + + while stack: + current = stack.pop() + # Current directory node is known if it already has an assigned date (ie. it was + # already seen as an isochrone frontier). + if current.known: + assert current.maxdate is None + current.maxdate = current.dbdate + else: + if any(x.maxdate is None for x in current.children): + # at least one child of current has no maxdate yet + # Current node needs to be analysed again after its children. + stack.append(current) + for child in current.children: + if child.maxdate is None: + # if child.maxdate is None, it must be processed + stack.append(child) + else: + # all the files and directories under current have a maxdate, + # we can infer the maxdate for current directory + assert current.maxdate is None + # if all content is already known, update current directory info. + current.maxdate = max( + [UTCMIN] + + [ + child.maxdate + for child in current.children + if child.maxdate is not None # unnecessary, but needed for mypy + ] + + [ + fdates.get(file.id, revision.date) + for file in current.entry.files + ] + ) + if current.maxdate <= revision.date: + current.known = ( + # true if all subdirectories are known + all(child.known for child in current.children) + # true if all files are in fdates, i.e. if all files were known + # *before building this isochrone graph node* + # Note: the 'all()' is lazy: will stop iterating as soon as + # possible + and all((file.id in fdates) for file in current.entry.files) + ) + else: + # at least one content is being processed out-of-order, then current + # node should be treated as unknown + current.maxdate = revision.date + current.known = False + logging.debug( + f"Maxdates for revision {hash_to_hex(revision.id)} successfully computed!" + ) + return root diff --git a/swh/provenance/origin.py b/swh/provenance/origin.py index 82f7709..3317873 100644 --- a/swh/provenance/origin.py +++ b/swh/provenance/origin.py @@ -1,41 +1,122 @@ from datetime import datetime, timezone from itertools import islice -from typing import Iterable, Iterator, Optional, Tuple +import logging +import time +from typing import Iterable, Iterator, List, Optional, Tuple import iso8601 -from .model import OriginEntry +from swh.model.hashutil import hash_to_hex -################################################################################ -################################################################################ +from .archive import ArchiveInterface +from .model import OriginEntry, RevisionEntry +from .provenance import ProvenanceInterface class CSVOriginIterator: """Iterator over origin visit statuses typically present in the given CSV file. The input is an iterator that produces 3 elements per row: (url, date, snap) where: - url: is the origin url of the visit - date: is the date of the visit - snap: sha1_git of the snapshot pointed by the visit status """ def __init__( self, statuses: Iterable[Tuple[str, datetime, bytes]], limit: Optional[int] = None, ): self.statuses: Iterator[Tuple[str, datetime, bytes]] if limit is not None: self.statuses = islice(statuses, limit) else: self.statuses = iter(statuses) def __iter__(self): for url, date, snap in self.statuses: date = iso8601.parse_date(date, default_timezone=timezone.utc) yield OriginEntry(url, date, snap) + + +def origin_add( + provenance: ProvenanceInterface, + archive: ArchiveInterface, + origins: List[OriginEntry], +) -> None: + start = time.time() + for origin in origins: + origin.retrieve_revisions(archive) + for revision in origin.revisions: + origin_add_revision(provenance, archive, origin, revision) + done = time.time() + provenance.commit() + stop = time.time() + logging.debug( + "Origins " + ";".join( + [origin.url + ":" + hash_to_hex(origin.snapshot) for origin in origins] + ) + + f" were processed in {stop - start} secs (commit took {stop - done} secs)!" + ) + + +def origin_add_revision( + provenance: ProvenanceInterface, + archive: ArchiveInterface, + origin: OriginEntry, + revision: RevisionEntry, +) -> None: + stack: List[Tuple[Optional[RevisionEntry], RevisionEntry]] = [(None, revision)] + origin.id = provenance.origin_get_id(origin) + + while stack: + relative, current = stack.pop() + + # Check if current revision has no preferred origin and update if necessary. + preferred = provenance.revision_get_preferred_origin(current) + + if preferred is None: + provenance.revision_set_preferred_origin(origin, current) + ######################################################################## + + if relative is None: + # This revision is pointed directly by the origin. + visited = provenance.revision_visited(current) + provenance.revision_add_to_origin(origin, current) + + if not visited: + stack.append((current, current)) + + else: + # This revision is a parent of another one in the history of the + # relative revision. + for parent in current.parents(archive): + visited = provenance.revision_visited(parent) + + if not visited: + # The parent revision has never been seen before pointing + # directly to an origin. + known = provenance.revision_in_history(parent) + + if known: + # The parent revision is already known in some other + # revision's history. We should point it directly to + # the origin and (eventually) walk its history. + stack.append((None, parent)) + else: + # The parent revision was never seen before. We should + # walk its history and associate it with the same + # relative revision. + provenance.revision_add_before_revision(relative, parent) + stack.append((relative, parent)) + else: + # The parent revision already points to an origin, so its + # history was properly processed before. We just need to + # make sure it points to the current origin as well. + provenance.revision_add_to_origin(origin, parent) diff --git a/swh/provenance/provenance.py b/swh/provenance/provenance.py index ab30f8c..1e6edb7 100644 --- a/swh/provenance/provenance.py +++ b/swh/provenance/provenance.py @@ -1,766 +1,270 @@ -from collections import Counter -from datetime import datetime, timezone +from datetime import datetime import logging import os -import time from typing import Any, Dict, Generator, Iterable, List, Optional, Tuple import psycopg2 from typing_extensions import Protocol, runtime_checkable -from swh.model.hashutil import hash_to_hex - -from .archive import ArchiveInterface from .model import DirectoryEntry, FileEntry, OriginEntry, RevisionEntry -UTCMIN = datetime.min.replace(tzinfo=timezone.utc) - # XXX: this protocol doesn't make much sense now that flavours have been delegated to # another class, lower in the callstack. @runtime_checkable class ProvenanceInterface(Protocol): raise_on_commit: bool = False def commit(self): """Commit currently ongoing transactions in the backend DB""" ... def content_add_to_directory( self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes ) -> None: ... def content_add_to_revision( self, revision: RevisionEntry, blob: FileEntry, prefix: bytes ) -> None: ... def content_find_first( self, blob: bytes ) -> Optional[Tuple[bytes, bytes, datetime, bytes]]: ... def content_find_all( self, blob: bytes, limit: Optional[int] = None ) -> Generator[Tuple[bytes, bytes, datetime, bytes], None, None]: ... def content_get_early_date(self, blob: FileEntry) -> Optional[datetime]: ... def content_get_early_dates( self, blobs: Iterable[FileEntry] ) -> Dict[bytes, datetime]: ... def content_set_early_date(self, blob: FileEntry, date: datetime) -> None: ... def directory_add_to_revision( self, revision: RevisionEntry, directory: DirectoryEntry, path: bytes ) -> None: ... def directory_get_date_in_isochrone_frontier( self, directory: DirectoryEntry ) -> Optional[datetime]: ... def directory_get_dates_in_isochrone_frontier( self, dirs: Iterable[DirectoryEntry] ) -> Dict[bytes, datetime]: ... def directory_set_date_in_isochrone_frontier( self, directory: DirectoryEntry, date: datetime ) -> None: ... def origin_get_id(self, origin: OriginEntry) -> int: ... def revision_add(self, revision: RevisionEntry) -> None: ... def revision_add_before_revision( self, relative: RevisionEntry, revision: RevisionEntry ) -> None: ... def revision_add_to_origin( self, origin: OriginEntry, revision: RevisionEntry ) -> None: ... def revision_get_early_date(self, revision: RevisionEntry) -> Optional[datetime]: ... def revision_get_preferred_origin(self, revision: RevisionEntry) -> int: ... def revision_in_history(self, revision: RevisionEntry) -> bool: ... def revision_set_preferred_origin( self, origin: OriginEntry, revision: RevisionEntry ) -> None: ... def revision_visited(self, revision: RevisionEntry) -> bool: ... # TODO: maybe move this to a separate file class ProvenanceBackend: raise_on_commit: bool = False def __init__(self, conn: psycopg2.extensions.connection, with_path: bool = True): from .postgresql.provenancedb_base import ProvenanceDBBase # TODO: this class should not know what the actual used DB is. self.storage: ProvenanceDBBase if with_path: from .postgresql.provenancedb_with_path import ProvenanceWithPathDB self.storage = ProvenanceWithPathDB(conn) else: from .postgresql.provenancedb_without_path import ProvenanceWithoutPathDB self.storage = ProvenanceWithoutPathDB(conn) self.write_cache: Dict[str, Any] = {} self.read_cache: Dict[str, Any] = {} self.clear_caches() def clear_caches(self): self.write_cache = { "content": dict(), "content_early_in_rev": set(), "content_in_dir": set(), "directory": dict(), "directory_in_rev": set(), "revision": dict(), "revision_before_rev": list(), "revision_in_org": list(), } self.read_cache = {"content": dict(), "directory": dict(), "revision": dict()} def commit(self): # TODO: for now we just forward the write_cache. This should be improved! while not self.storage.commit( self.write_cache, raise_on_commit=self.raise_on_commit ): logging.warning( f"Unable to commit cached information {self.write_cache}. Retrying..." ) self.clear_caches() def content_add_to_directory( self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes ): self.write_cache["content_in_dir"].add( (blob.id, directory.id, normalize(os.path.join(prefix, blob.name))) ) def content_add_to_revision( self, revision: RevisionEntry, blob: FileEntry, prefix: bytes ): self.write_cache["content_early_in_rev"].add( (blob.id, revision.id, normalize(os.path.join(prefix, blob.name))) ) def content_find_first( self, blob: bytes ) -> Optional[Tuple[bytes, bytes, datetime, bytes]]: return self.storage.content_find_first(blob) def content_find_all( self, blob: bytes, limit: Optional[int] = None ) -> Generator[Tuple[bytes, bytes, datetime, bytes], None, None]: yield from self.storage.content_find_all(blob, limit=limit) def content_get_early_date(self, blob: FileEntry) -> Optional[datetime]: return self.get_dates("content", [blob.id]).get(blob.id, None) def content_get_early_dates( self, blobs: Iterable[FileEntry] ) -> Dict[bytes, datetime]: return self.get_dates("content", [blob.id for blob in blobs]) def content_set_early_date(self, blob: FileEntry, date: datetime): self.write_cache["content"][blob.id] = date # update read cache as well self.read_cache["content"][blob.id] = date def directory_add_to_revision( self, revision: RevisionEntry, directory: DirectoryEntry, path: bytes ): self.write_cache["directory_in_rev"].add( (directory.id, revision.id, normalize(path)) ) def directory_get_date_in_isochrone_frontier( self, directory: DirectoryEntry ) -> Optional[datetime]: return self.get_dates("directory", [directory.id]).get(directory.id, None) def directory_get_dates_in_isochrone_frontier( self, dirs: Iterable[DirectoryEntry] ) -> Dict[bytes, datetime]: return self.get_dates("directory", [directory.id for directory in dirs]) def directory_set_date_in_isochrone_frontier( self, directory: DirectoryEntry, date: datetime ): self.write_cache["directory"][directory.id] = date # update read cache as well self.read_cache["directory"][directory.id] = date def get_dates(self, entity: str, ids: List[bytes]) -> Dict[bytes, datetime]: dates = {} pending = [] for sha1 in ids: # Check whether the date has been queried before date = self.read_cache[entity].get(sha1, None) if date is not None: dates[sha1] = date else: pending.append(sha1) dates.update(self.storage.get_dates(entity, pending)) return dates def origin_get_id(self, origin: OriginEntry) -> int: if origin.id is None: return self.storage.origin_get_id(origin.url) else: return origin.id def revision_add(self, revision: RevisionEntry): # Add current revision to the compact DB self.write_cache["revision"][revision.id] = revision.date # update read cache as well self.read_cache["revision"][revision.id] = revision.date def revision_add_before_revision( self, relative: RevisionEntry, revision: RevisionEntry ): self.write_cache["revision_before_rev"].append((revision.id, relative.id)) def revision_add_to_origin(self, origin: OriginEntry, revision: RevisionEntry): self.write_cache["revision_in_org"].append((revision.id, origin.id)) def revision_get_early_date(self, revision: RevisionEntry) -> Optional[datetime]: return self.get_dates("revision", [revision.id]).get(revision.id, None) def revision_get_preferred_origin(self, revision: RevisionEntry) -> int: # TODO: adapt this method to consider cached values return self.storage.revision_get_preferred_origin(revision.id) def revision_in_history(self, revision: RevisionEntry) -> bool: # TODO: adapt this method to consider cached values return self.storage.revision_in_history(revision.id) def revision_set_preferred_origin( self, origin: OriginEntry, revision: RevisionEntry ): assert origin.id is not None # TODO: adapt this method to consider cached values self.storage.revision_set_preferred_origin(origin.id, revision.id) def revision_visited(self, revision: RevisionEntry) -> bool: # TODO: adapt this method to consider cached values return self.storage.revision_visited(revision.id) def normalize(path: bytes) -> bytes: return path[2:] if path.startswith(bytes("." + os.path.sep, "utf-8")) else path - - -def flatten_directory( - archive: ArchiveInterface, - provenance: ProvenanceInterface, - directory: DirectoryEntry, -) -> None: - """Recursively retrieve all the files of 'directory' and insert them in the - 'provenance' database in the 'content_to_directory' table. - """ - stack = [(directory, b"")] - while stack: - current, prefix = stack.pop() - current.retrieve_children(archive) - for f_child in current.files: - # Add content to the directory with the computed prefix. - provenance.content_add_to_directory(directory, f_child, prefix) - for d_child in current.dirs: - # Recursively walk the child directory. - stack.append((d_child, os.path.join(prefix, d_child.name))) - - -def origin_add( - provenance: ProvenanceInterface, - archive: ArchiveInterface, - origins: List[OriginEntry], -) -> None: - start = time.time() - for origin in origins: - origin.retrieve_revisions(archive) - for revision in origin.revisions: - origin_add_revision(provenance, archive, origin, revision) - done = time.time() - provenance.commit() - stop = time.time() - logging.debug( - "Origins " - ";".join( - [origin.url + ":" + hash_to_hex(origin.snapshot) for origin in origins] - ) - + f" were processed in {stop - start} secs (commit took {stop - done} secs)!" - ) - - -def origin_add_revision( - provenance: ProvenanceInterface, - archive: ArchiveInterface, - origin: OriginEntry, - revision: RevisionEntry, -) -> None: - stack: List[Tuple[Optional[RevisionEntry], RevisionEntry]] = [(None, revision)] - origin.id = provenance.origin_get_id(origin) - - while stack: - relative, current = stack.pop() - - # Check if current revision has no preferred origin and update if necessary. - preferred = provenance.revision_get_preferred_origin(current) - - if preferred is None: - provenance.revision_set_preferred_origin(origin, current) - ######################################################################## - - if relative is None: - # This revision is pointed directly by the origin. - visited = provenance.revision_visited(current) - provenance.revision_add_to_origin(origin, current) - - if not visited: - stack.append((current, current)) - - else: - # This revision is a parent of another one in the history of the - # relative revision. - for parent in current.parents(archive): - visited = provenance.revision_visited(parent) - - if not visited: - # The parent revision has never been seen before pointing - # directly to an origin. - known = provenance.revision_in_history(parent) - - if known: - # The parent revision is already known in some other - # revision's history. We should point it directly to - # the origin and (eventually) walk its history. - stack.append((None, parent)) - else: - # The parent revision was never seen before. We should - # walk its history and associate it with the same - # relative revision. - provenance.revision_add_before_revision(relative, parent) - stack.append((relative, parent)) - else: - # The parent revision already points to an origin, so its - # history was properly processed before. We just need to - # make sure it points to the current origin as well. - provenance.revision_add_to_origin(origin, parent) - - -def revision_add( - provenance: ProvenanceInterface, - archive: ArchiveInterface, - revisions: List[RevisionEntry], - trackall: bool = True, - lower: bool = True, - mindepth: int = 1, - commit: bool = True, -) -> None: - start = time.time() - for revision in revisions: - assert revision.date is not None - assert revision.root is not None - # Processed content starting from the revision's root directory. - date = provenance.revision_get_early_date(revision) - if date is None or revision.date < date: - logging.debug( - f"Processing revisions {hash_to_hex(revision.id)}" - f" (known date {date} / revision date {revision.date})..." - ) - graph = build_isochrone_graph( - archive, - provenance, - revision, - DirectoryEntry(revision.root), - ) - # TODO: add file size filtering - revision_process_content( - archive, - provenance, - revision, - graph, - trackall=trackall, - lower=lower, - mindepth=mindepth, - ) - done = time.time() - if commit: - provenance.commit() - stop = time.time() - logging.debug( - f"Revisions {';'.join([hash_to_hex(revision.id) for revision in revisions])} " - f" were processed in {stop - start} secs (commit took {stop - done} secs)!" - ) - # logging.critical( - # ";".join([hash_to_hex(revision.id) for revision in revisions]) - # + f",{stop - start},{stop - done}" - # ) - - -class IsochroneNode: - def __init__( - self, - entry: DirectoryEntry, - dbdate: Optional[datetime] = None, - depth: int = 0, - prefix: bytes = b"", - ): - self.entry = entry - self.depth = depth - - # dbdate is the maxdate for this node that comes from the DB - self._dbdate: Optional[datetime] = dbdate - - # maxdate is set by the maxdate computation algorithm - self.maxdate: Optional[datetime] = None - - # known is True if this node is already known in the db; either because - # the current directory actually exists in the database, or because all - # the content of the current directory is known (subdirectories and files) - self.known = self.dbdate is not None - self.invalid = False - self.path = os.path.join(prefix, self.entry.name) if prefix else self.entry.name - self.children: List[IsochroneNode] = [] - - @property - def dbdate(self): - # use a property to make this attribute (mostly) read-only - return self._dbdate - - def invalidate(self): - self._dbdate = None - self.maxdate = None - self.known = False - self.invalid = True - - def add_directory( - self, child: DirectoryEntry, date: Optional[datetime] = None - ) -> "IsochroneNode": - # we should not be processing this node (ie add subdirectories or - # files) if it's actually known by the provenance DB - assert self.dbdate is None - node = IsochroneNode(child, dbdate=date, depth=self.depth + 1, prefix=self.path) - self.children.append(node) - return node - - def __str__(self): - return ( - f"<{self.entry}: dbdate={self.dbdate}, maxdate={self.maxdate}, " - f"known={self.known}, invalid={self.invalid}, path={self.path}, " - f"children=[{', '.join(str(child) for child in self.children)}]>" - ) - - def __eq__(self, other): - return ( - isinstance(other, IsochroneNode) - and ( - self.entry, - self.depth, - self._dbdate, - self.maxdate, - self.known, - self.invalid, - self.path, - ) - == ( - other.entry, - other.depth, - other._dbdate, - other.maxdate, - other.known, - other.invalid, - other.path, - ) - and Counter(self.children) == Counter(other.children) - ) - - def __hash__(self): - return hash( - ( - self.entry, - self.depth, - self._dbdate, - self.maxdate, - self.known, - self.invalid, - self.path, - ) - ) - - -def build_isochrone_graph( - archive: ArchiveInterface, - provenance: ProvenanceInterface, - revision: RevisionEntry, - directory: DirectoryEntry, -) -> IsochroneNode: - assert revision.date is not None - assert revision.root == directory.id - - # this function process a revision in 2 steps: - # - # 1. build the tree structure of IsochroneNode objects (one INode per - # directory under the root directory of the revision but not following - # known subdirectories), and gather the dates from the DB for already - # known objects; for files, just keep all the dates in a global 'fdates' - # dict; note that in this step, we will only recurse the directories - # that are not already known. - # - # 2. compute the maxdate for each node of the tree that was not found in the DB. - - # Build the nodes structure - root_date = provenance.directory_get_date_in_isochrone_frontier(directory) - root = IsochroneNode(directory, dbdate=root_date) - stack = [root] - logging.debug( - f"Recursively creating graph for revision {hash_to_hex(revision.id)}..." - ) - fdates: Dict[bytes, datetime] = {} # map {file_id: date} - while stack: - current = stack.pop() - if current.dbdate is None or current.dbdate > revision.date: - # If current directory has an associated date in the isochrone frontier that - # is greater or equal to the current revision's one, it should be ignored as - # the revision is being processed out of order. - if current.dbdate is not None and current.dbdate > revision.date: - logging.debug( - f"Invalidating frontier on {hash_to_hex(current.entry.id)}" - f" (date {current.dbdate})" - f" when processing revision {hash_to_hex(revision.id)}" - f" (date {revision.date})" - ) - current.invalidate() - - # Pre-query all known dates for directories in the current directory - # for the provenance object to have them cached and (potentially) improve - # performance. - current.entry.retrieve_children(archive) - ddates = provenance.directory_get_dates_in_isochrone_frontier( - current.entry.dirs - ) - for dir in current.entry.dirs: - # Recursively analyse subdirectory nodes - node = current.add_directory(dir, date=ddates.get(dir.id, None)) - stack.append(node) - - fdates.update(provenance.content_get_early_dates(current.entry.files)) - - logging.debug( - f"Isochrone graph for revision {hash_to_hex(revision.id)} successfully created!" - ) - # Precalculate max known date for each node in the graph (only directory nodes are - # pushed to the stack). - logging.debug(f"Computing maxdates for revision {hash_to_hex(revision.id)}...") - stack = [root] - - while stack: - current = stack.pop() - # Current directory node is known if it already has an assigned date (ie. it was - # already seen as an isochrone frontier). - if current.known: - assert current.maxdate is None - current.maxdate = current.dbdate - else: - if any(x.maxdate is None for x in current.children): - # at least one child of current has no maxdate yet - # Current node needs to be analysed again after its children. - stack.append(current) - for child in current.children: - if child.maxdate is None: - # if child.maxdate is None, it must be processed - stack.append(child) - else: - # all the files and directories under current have a maxdate, - # we can infer the maxdate for current directory - assert current.maxdate is None - # if all content is already known, update current directory info. - current.maxdate = max( - [UTCMIN] - + [ - child.maxdate - for child in current.children - if child.maxdate is not None # unnecessary, but needed for mypy - ] - + [ - fdates.get(file.id, revision.date) - for file in current.entry.files - ] - ) - if current.maxdate <= revision.date: - current.known = ( - # true if all subdirectories are known - all(child.known for child in current.children) - # true if all files are in fdates, i.e. if all files were known - # *before building this isochrone graph node* - # Note: the 'all()' is lazy: will stop iterating as soon as - # possible - and all((file.id in fdates) for file in current.entry.files) - ) - else: - # at least one content is being processed out-of-order, then current - # node should be treated as unknown - current.maxdate = revision.date - current.known = False - logging.debug( - f"Maxdates for revision {hash_to_hex(revision.id)} successfully computed!" - ) - return root - - -def revision_process_content( - archive: ArchiveInterface, - provenance: ProvenanceInterface, - revision: RevisionEntry, - graph: IsochroneNode, - trackall: bool = True, - lower: bool = True, - mindepth: int = 1, -): - assert revision.date is not None - provenance.revision_add(revision) - - stack = [graph] - while stack: - current = stack.pop() - if current.dbdate is not None: - assert current.dbdate <= revision.date - if trackall: - # Current directory is an outer isochrone frontier for a previously - # processed revision. It should be reused as is. - provenance.directory_add_to_revision( - revision, current.entry, current.path - ) - else: - assert current.maxdate is not None - # Current directory is not an outer isochrone frontier for any previous - # revision. It might be eligible for this one. - if is_new_frontier( - current, - revision=revision, - trackall=trackall, - lower=lower, - mindepth=mindepth, - ): - # Outer frontier should be moved to current position in the isochrone - # graph. This is the first time this directory is found in the isochrone - # frontier. - provenance.directory_set_date_in_isochrone_frontier( - current.entry, current.maxdate - ) - if trackall: - provenance.directory_add_to_revision( - revision, current.entry, current.path - ) - flatten_directory(archive, provenance, current.entry) - else: - # If current node is an invalidated frontier, update its date for future - # revisions to get the proper value. - if current.invalid: - provenance.directory_set_date_in_isochrone_frontier( - current.entry, current.maxdate - ) - # No point moving the frontier here. Either there are no files or they - # are being seen for the first time here. Add all blobs to current - # revision updating date if necessary, and recursively analyse - # subdirectories as candidates to the outer frontier. - for blob in current.entry.files: - date = provenance.content_get_early_date(blob) - if date is None or revision.date < date: - provenance.content_set_early_date(blob, revision.date) - provenance.content_add_to_revision(revision, blob, current.path) - for child in current.children: - stack.append(child) - - -def is_new_frontier( - node: IsochroneNode, - revision: RevisionEntry, - trackall: bool = True, - lower: bool = True, - mindepth: int = 1, -) -> bool: - assert node.maxdate is not None # for mypy - assert revision.date is not None # idem - if trackall: - # The only real condition for a directory to be a frontier is that its - # content is already known and its maxdate is less (or equal) than - # current revision's date. Checking mindepth is meant to skip root - # directories (or any arbitrary depth) to improve the result. The - # option lower tries to maximize the reusage rate of previously defined - # frontiers by keeping them low in the directory tree. - return ( - node.known - and node.maxdate <= revision.date # all content is earlier than revision - and node.depth - >= mindepth # current node is deeper than the min allowed depth - and (has_blobs(node) if lower else True) # there is at least one blob in it - ) - else: - # If we are only tracking first occurrences, we want to ensure that all first - # occurrences end up in the content_early_in_rev relation. Thus, we force for - # every blob outside a frontier to have an extrictly earlier date. - return ( - node.maxdate < revision.date # all content is earlier than revision - and node.depth >= mindepth # deeper than the min allowed depth - and (has_blobs(node) if lower else True) # there is at least one blob - ) - - -def has_blobs(node: IsochroneNode) -> bool: - # We may want to look for files in different ways to decide whether to define a - # frontier or not: - # 1. Only files in current node: - return any(node.entry.files) - # 2. Files anywhere in the isochrone graph - # stack = [node] - # while stack: - # current = stack.pop() - # if any( - # map(lambda child: isinstance(child.entry, FileEntry), current.children)): - # return True - # else: - # # All children are directory entries. - # stack.extend(current.children) - # return False - # 3. Files in the intermediate directories between current node and any previously - # defined frontier: - # TODO: complete this case! - # return any( - # map(lambda child: isinstance(child.entry, FileEntry), node.children) - # ) or all( - # map( - # lambda child: ( - # not (isinstance(child.entry, DirectoryEntry) and child.date is None) - # ) - # or has_blobs(child), - # node.children, - # ) - # ) diff --git a/swh/provenance/revision.py b/swh/provenance/revision.py index f3d6972..e73fce6 100644 --- a/swh/provenance/revision.py +++ b/swh/provenance/revision.py @@ -1,50 +1,255 @@ from datetime import datetime, timezone from itertools import islice -from typing import Iterable, Iterator, Optional, Tuple +import logging +import os +import time +from typing import Iterable, Iterator, List, Optional, Tuple import iso8601 -from swh.model.hashutil import hash_to_bytes -from swh.provenance.model import RevisionEntry +from swh.model.hashutil import hash_to_bytes, hash_to_hex -######################################################################################## -######################################################################################## +from .archive import ArchiveInterface +from .graph import IsochroneNode, build_isochrone_graph +from .model import DirectoryEntry, RevisionEntry +from .provenance import ProvenanceInterface class CSVRevisionIterator: """Iterator over revisions typically present in the given CSV file. The input is an iterator that produces 3 elements per row: (id, date, root) where: - id: is the id (sha1_git) of the revision - date: is the author date - root: sha1 of the directory """ def __init__( self, revisions: Iterable[Tuple[bytes, datetime, bytes]], limit: Optional[int] = None, ): self.revisions: Iterator[Tuple[bytes, datetime, bytes]] if limit is not None: self.revisions = islice(revisions, limit) else: self.revisions = iter(revisions) def __iter__(self): return self def __next__(self): id, date, root = next(self.revisions) date = iso8601.parse_date(date) if date.tzinfo is None: date = date.replace(tzinfo=timezone.utc) return RevisionEntry( hash_to_bytes(id), date=date, root=hash_to_bytes(root), ) + + +def revision_add( + provenance: ProvenanceInterface, + archive: ArchiveInterface, + revisions: List[RevisionEntry], + trackall: bool = True, + lower: bool = True, + mindepth: int = 1, + commit: bool = True, +) -> None: + start = time.time() + for revision in revisions: + assert revision.date is not None + assert revision.root is not None + # Processed content starting from the revision's root directory. + date = provenance.revision_get_early_date(revision) + if date is None or revision.date < date: + logging.debug( + f"Processing revisions {hash_to_hex(revision.id)}" + f" (known date {date} / revision date {revision.date})..." + ) + graph = build_isochrone_graph( + archive, + provenance, + revision, + DirectoryEntry(revision.root), + ) + # TODO: add file size filtering + revision_process_content( + archive, + provenance, + revision, + graph, + trackall=trackall, + lower=lower, + mindepth=mindepth, + ) + done = time.time() + if commit: + provenance.commit() + stop = time.time() + logging.debug( + f"Revisions {';'.join([hash_to_hex(revision.id) for revision in revisions])} " + f" were processed in {stop - start} secs (commit took {stop - done} secs)!" + ) + # logging.critical( + # ";".join([hash_to_hex(revision.id) for revision in revisions]) + # + f",{stop - start},{stop - done}" + # ) + + +def revision_process_content( + archive: ArchiveInterface, + provenance: ProvenanceInterface, + revision: RevisionEntry, + graph: IsochroneNode, + trackall: bool = True, + lower: bool = True, + mindepth: int = 1, +): + assert revision.date is not None + provenance.revision_add(revision) + + stack = [graph] + while stack: + current = stack.pop() + if current.dbdate is not None: + assert current.dbdate <= revision.date + if trackall: + # Current directory is an outer isochrone frontier for a previously + # processed revision. It should be reused as is. + provenance.directory_add_to_revision( + revision, current.entry, current.path + ) + else: + assert current.maxdate is not None + # Current directory is not an outer isochrone frontier for any previous + # revision. It might be eligible for this one. + if is_new_frontier( + current, + revision=revision, + trackall=trackall, + lower=lower, + mindepth=mindepth, + ): + # Outer frontier should be moved to current position in the isochrone + # graph. This is the first time this directory is found in the isochrone + # frontier. + provenance.directory_set_date_in_isochrone_frontier( + current.entry, current.maxdate + ) + if trackall: + provenance.directory_add_to_revision( + revision, current.entry, current.path + ) + flatten_directory(archive, provenance, current.entry) + else: + # If current node is an invalidated frontier, update its date for future + # revisions to get the proper value. + if current.invalid: + provenance.directory_set_date_in_isochrone_frontier( + current.entry, current.maxdate + ) + # No point moving the frontier here. Either there are no files or they + # are being seen for the first time here. Add all blobs to current + # revision updating date if necessary, and recursively analyse + # subdirectories as candidates to the outer frontier. + for blob in current.entry.files: + date = provenance.content_get_early_date(blob) + if date is None or revision.date < date: + provenance.content_set_early_date(blob, revision.date) + provenance.content_add_to_revision(revision, blob, current.path) + for child in current.children: + stack.append(child) + + +def flatten_directory( + archive: ArchiveInterface, + provenance: ProvenanceInterface, + directory: DirectoryEntry, +) -> None: + """Recursively retrieve all the files of 'directory' and insert them in the + 'provenance' database in the 'content_to_directory' table. + """ + stack = [(directory, b"")] + while stack: + current, prefix = stack.pop() + current.retrieve_children(archive) + for f_child in current.files: + # Add content to the directory with the computed prefix. + provenance.content_add_to_directory(directory, f_child, prefix) + for d_child in current.dirs: + # Recursively walk the child directory. + stack.append((d_child, os.path.join(prefix, d_child.name))) + + +def is_new_frontier( + node: IsochroneNode, + revision: RevisionEntry, + trackall: bool = True, + lower: bool = True, + mindepth: int = 1, +) -> bool: + assert node.maxdate is not None # for mypy + assert revision.date is not None # idem + if trackall: + # The only real condition for a directory to be a frontier is that its + # content is already known and its maxdate is less (or equal) than + # current revision's date. Checking mindepth is meant to skip root + # directories (or any arbitrary depth) to improve the result. The + # option lower tries to maximize the reusage rate of previously defined + # frontiers by keeping them low in the directory tree. + return ( + node.known + and node.maxdate <= revision.date # all content is earlier than revision + and node.depth + >= mindepth # current node is deeper than the min allowed depth + and (has_blobs(node) if lower else True) # there is at least one blob in it + ) + else: + # If we are only tracking first occurrences, we want to ensure that all first + # occurrences end up in the content_early_in_rev relation. Thus, we force for + # every blob outside a frontier to have an extrictly earlier date. + return ( + node.maxdate < revision.date # all content is earlier than revision + and node.depth >= mindepth # deeper than the min allowed depth + and (has_blobs(node) if lower else True) # there is at least one blob + ) + + +def has_blobs(node: IsochroneNode) -> bool: + # We may want to look for files in different ways to decide whether to define a + # frontier or not: + # 1. Only files in current node: + return any(node.entry.files) + # 2. Files anywhere in the isochrone graph + # stack = [node] + # while stack: + # current = stack.pop() + # if any( + # map(lambda child: isinstance(child.entry, FileEntry), current.children)): + # return True + # else: + # # All children are directory entries. + # stack.extend(current.children) + # return False + # 3. Files in the intermediate directories between current node and any previously + # defined frontier: + # TODO: complete this case! + # return any( + # map(lambda child: isinstance(child.entry, FileEntry), node.children) + # ) or all( + # map( + # lambda child: ( + # not (isinstance(child.entry, DirectoryEntry) and child.date is None) + # ) + # or has_blobs(child), + # node.children, + # ) + # ) diff --git a/swh/provenance/tests/test_isochrone_graph.py b/swh/provenance/tests/test_isochrone_graph.py index 963982c..f534a32 100644 --- a/swh/provenance/tests/test_isochrone_graph.py +++ b/swh/provenance/tests/test_isochrone_graph.py @@ -1,100 +1,101 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from copy import deepcopy from datetime import datetime, timezone import pytest import yaml from swh.model.hashutil import hash_to_bytes +from swh.provenance.graph import IsochroneNode, build_isochrone_graph from swh.provenance.model import DirectoryEntry, RevisionEntry -from swh.provenance.provenance import IsochroneNode, build_isochrone_graph, revision_add +from swh.provenance.revision import revision_add from swh.provenance.tests.conftest import fill_storage, get_datafile, load_repo_data from swh.provenance.tests.test_provenance_db import ts2dt def isochrone_graph_from_dict(d, depth=0) -> IsochroneNode: """Takes a dictionary representing a tree of IsochroneNode objects, and recursively builds the corresponding graph.""" d = deepcopy(d) d["entry"]["id"] = hash_to_bytes(d["entry"]["id"]) d["entry"]["name"] = bytes(d["entry"]["name"], encoding="utf-8") dbdate = d.get("dbdate", None) if dbdate is not None: dbdate = datetime.fromtimestamp(d["dbdate"], timezone.utc) children = d.get("children", []) node = IsochroneNode( entry=DirectoryEntry(**d["entry"]), dbdate=dbdate, depth=depth, ) node.maxdate = datetime.fromtimestamp(d["maxdate"], timezone.utc) node.known = d.get("known", False) node.invalid = d.get("invalid", False) node.path = bytes(d["path"], encoding="utf-8") node.children = [ isochrone_graph_from_dict(child, depth=depth + 1) for child in children ] return node @pytest.mark.parametrize( "repo, lower, mindepth", ( ("cmdbts2", True, 1), ("cmdbts2", False, 1), ("cmdbts2", True, 2), ("cmdbts2", False, 2), ("out-of-order", True, 1), ), ) @pytest.mark.parametrize("batch", (True, False)) def test_isochrone_graph( provenance, swh_storage, archive, repo, lower, mindepth, batch ): # read data/README.md for more details on how these datasets are generated data = load_repo_data(repo) fill_storage(swh_storage, data) revisions = {rev["id"]: rev for rev in data["revision"]} filename = f"graphs_{repo}_{'lower' if lower else 'upper'}_{mindepth}.yaml" with open(get_datafile(filename)) as file: for expected in yaml.full_load(file): print("# Processing revision", expected["rev"]) revision = revisions[hash_to_bytes(expected["rev"])] entry = RevisionEntry( id=revision["id"], date=ts2dt(revision["date"]), root=revision["directory"], ) expected_graph = isochrone_graph_from_dict(expected["graph"]) print("Expected graph:", expected_graph) # Create graph for current revision and check it has the expected structure. computed_graph = build_isochrone_graph( archive, provenance, entry, DirectoryEntry(entry.root), ) print("Computed graph:", computed_graph) assert computed_graph == expected_graph # Add current revision so that provenance info is kept up to date for the # following ones. revision_add( provenance, archive, [entry], lower=lower, mindepth=mindepth, commit=not batch, ) diff --git a/swh/provenance/tests/test_provenance_db.py b/swh/provenance/tests/test_provenance_db.py index f98d059..cf9b22a 100644 --- a/swh/provenance/tests/test_provenance_db.py +++ b/swh/provenance/tests/test_provenance_db.py @@ -1,31 +1,31 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime from swh.model.tests.swh_model_data import TEST_OBJECTS -from swh.provenance.origin import OriginEntry -from swh.provenance.provenance import origin_add +from swh.provenance.model import OriginEntry +from swh.provenance.origin import origin_add from swh.provenance.storage.archive import ArchiveStorage def ts2dt(ts: dict) -> datetime.datetime: timestamp = datetime.datetime.fromtimestamp( ts["timestamp"]["seconds"], datetime.timezone(datetime.timedelta(minutes=ts["offset"])), ) return timestamp.replace(microsecond=ts["timestamp"]["microseconds"]) def test_provenance_origin_add(provenance, swh_storage_with_objects): """Test the origin_add function""" archive = ArchiveStorage(swh_storage_with_objects) for status in TEST_OBJECTS["origin_visit_status"]: if status.snapshot is not None: entry = OriginEntry( url=status.origin, date=status.date, snapshot=status.snapshot ) origin_add(provenance, archive, [entry]) # TODO: check some facts here diff --git a/swh/provenance/tests/test_provenance_heuristics.py b/swh/provenance/tests/test_provenance_heuristics.py index c55005c..fe9d62c 100644 --- a/swh/provenance/tests/test_provenance_heuristics.py +++ b/swh/provenance/tests/test_provenance_heuristics.py @@ -1,325 +1,325 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Dict, List, Tuple import pytest from swh.provenance.model import RevisionEntry -from swh.provenance.provenance import revision_add +from swh.provenance.revision import revision_add from swh.provenance.tests.conftest import ( fill_storage, get_datafile, load_repo_data, synthetic_result, ) from swh.provenance.tests.test_provenance_db import ts2dt def sha1s(cur, table): """return the 'sha1' column from the DB 'table' (as hex) 'cur' is a cursor to the provenance index DB. """ cur.execute(f"SELECT sha1 FROM {table}") return set(sha1.hex() for (sha1,) in cur.fetchall()) def locations(cur): """return the 'path' column from the DB location table 'cur' is a cursor to the provenance index DB. """ cur.execute("SELECT encode(location.path::bytea, 'escape') FROM location") return set(x for (x,) in cur.fetchall()) def relations(cur, src, dst): """return the triplets ('sha1', 'sha1', 'path') from the DB for the relation between 'src' table and 'dst' table (i.e. for C-R, C-D and D-R relations). 'cur' is a cursor to the provenance index DB. """ relation = { ("content", "revision"): "content_early_in_rev", ("content", "directory"): "content_in_dir", ("directory", "revision"): "directory_in_rev", }[(src, dst)] srccol = {"content": "blob", "directory": "dir"}[src] dstcol = {"directory": "dir", "revision": "rev"}[dst] cur.execute( f"SELECT encode(src.sha1::bytea, 'hex')," f" encode(dst.sha1::bytea, 'hex')," f" encode(location.path::bytea, 'escape') " f"FROM {relation} as rel, " f" {src} as src, {dst} as dst, location " f"WHERE rel.{srccol}=src.id AND rel.{dstcol}=dst.id AND rel.loc=location.id" ) return set(cur.fetchall()) def get_timestamp(cur, table, sha1): """return the date for the 'sha1' from the DB 'table' (as hex) 'cur' is a cursor to the provenance index DB. """ if isinstance(sha1, str): sha1 = bytes.fromhex(sha1) cur.execute(f"SELECT date FROM {table} WHERE sha1=%s", (sha1,)) return [date.timestamp() for (date,) in cur.fetchall()] @pytest.mark.parametrize( "repo, lower, mindepth", ( ("cmdbts2", True, 1), ("cmdbts2", False, 1), ("cmdbts2", True, 2), ("cmdbts2", False, 2), ("out-of-order", True, 1), ), ) def test_provenance_heuristics(provenance, swh_storage, archive, repo, lower, mindepth): # read data/README.md for more details on how these datasets are generated data = load_repo_data(repo) fill_storage(swh_storage, data) syntheticfile = get_datafile( f"synthetic_{repo}_{'lower' if lower else 'upper'}_{mindepth}.txt" ) revisions = {rev["id"]: rev for rev in data["revision"]} rows = { "content": set(), "content_in_dir": set(), "content_early_in_rev": set(), "directory": set(), "directory_in_rev": set(), "location": set(), "revision": set(), } for synth_rev in synthetic_result(syntheticfile): revision = revisions[synth_rev["sha1"]] entry = RevisionEntry( id=revision["id"], date=ts2dt(revision["date"]), root=revision["directory"], ) revision_add(provenance, archive, [entry], lower=lower, mindepth=mindepth) # each "entry" in the synth file is one new revision rows["revision"].add(synth_rev["sha1"].hex()) assert rows["revision"] == sha1s( provenance.storage.cursor, "revision" ), synth_rev["msg"] # check the timestamp of the revision rev_ts = synth_rev["date"] assert get_timestamp( provenance.storage.cursor, "revision", synth_rev["sha1"].hex() ) == [rev_ts], synth_rev["msg"] # this revision might have added new content objects rows["content"] |= set(x["dst"].hex() for x in synth_rev["R_C"]) rows["content"] |= set(x["dst"].hex() for x in synth_rev["D_C"]) assert rows["content"] == sha1s( provenance.storage.cursor, "content" ), synth_rev["msg"] # check for R-C (direct) entries # these are added directly in the content_early_in_rev table rows["content_early_in_rev"] |= set( (x["dst"].hex(), x["src"].hex(), x["path"]) for x in synth_rev["R_C"] ) assert rows["content_early_in_rev"] == relations( provenance.storage.cursor, "content", "revision" ), synth_rev["msg"] # check timestamps for rc in synth_rev["R_C"]: assert get_timestamp(provenance.storage.cursor, "content", rc["dst"]) == [ rev_ts + rc["rel_ts"] ], synth_rev["msg"] # check directories # each directory stored in the provenance index is an entry # in the "directory" table... rows["directory"] |= set(x["dst"].hex() for x in synth_rev["R_D"]) assert rows["directory"] == sha1s( provenance.storage.cursor, "directory" ), synth_rev["msg"] # ... + a number of rows in the "directory_in_rev" table... # check for R-D entries rows["directory_in_rev"] |= set( (x["dst"].hex(), x["src"].hex(), x["path"]) for x in synth_rev["R_D"] ) assert rows["directory_in_rev"] == relations( provenance.storage.cursor, "directory", "revision" ), synth_rev["msg"] # check timestamps for rd in synth_rev["R_D"]: assert get_timestamp(provenance.storage.cursor, "directory", rd["dst"]) == [ rev_ts + rd["rel_ts"] ], synth_rev["msg"] # ... + a number of rows in the "content_in_dir" table # for content of the directory. # check for D-C entries rows["content_in_dir"] |= set( (x["dst"].hex(), x["src"].hex(), x["path"]) for x in synth_rev["D_C"] ) assert rows["content_in_dir"] == relations( provenance.storage.cursor, "content", "directory" ), synth_rev["msg"] # check timestamps for dc in synth_rev["D_C"]: assert get_timestamp(provenance.storage.cursor, "content", dc["dst"]) == [ rev_ts + dc["rel_ts"] ], synth_rev["msg"] # check for location entries rows["location"] |= set(x["path"] for x in synth_rev["R_C"]) rows["location"] |= set(x["path"] for x in synth_rev["D_C"]) rows["location"] |= set(x["path"] for x in synth_rev["R_D"]) assert rows["location"] == locations(provenance.storage.cursor), synth_rev[ "msg" ] @pytest.mark.parametrize( "repo, lower, mindepth", ( ("cmdbts2", True, 1), ("cmdbts2", False, 1), ("cmdbts2", True, 2), ("cmdbts2", False, 2), ("out-of-order", True, 1), ), ) @pytest.mark.parametrize("batch", (True, False)) def test_provenance_heuristics_content_find_all( provenance, swh_storage, archive, repo, lower, mindepth, batch ): # read data/README.md for more details on how these datasets are generated data = load_repo_data(repo) fill_storage(swh_storage, data) revisions = [ RevisionEntry( id=revision["id"], date=ts2dt(revision["date"]), root=revision["directory"], ) for revision in data["revision"] ] if batch: revision_add(provenance, archive, revisions, lower=lower, mindepth=mindepth) else: for revision in revisions: revision_add( provenance, archive, [revision], lower=lower, mindepth=mindepth ) syntheticfile = get_datafile( f"synthetic_{repo}_{'lower' if lower else 'upper'}_{mindepth}.txt" ) expected_occurrences = {} for synth_rev in synthetic_result(syntheticfile): rev_id = synth_rev["sha1"].hex() rev_ts = synth_rev["date"] for rc in synth_rev["R_C"]: expected_occurrences.setdefault(rc["dst"].hex(), []).append( (rev_id, rev_ts, rc["path"]) ) for dc in synth_rev["D_C"]: assert dc["prefix"] is not None # to please mypy expected_occurrences.setdefault(dc["dst"].hex(), []).append( (rev_id, rev_ts, dc["prefix"] + "/" + dc["path"]) ) for content_id, results in expected_occurrences.items(): expected = [(content_id, *result) for result in results] db_occurrences = [ (blob.hex(), rev.hex(), date.timestamp(), path.decode()) for blob, rev, date, path in provenance.content_find_all( bytes.fromhex(content_id) ) ] assert len(db_occurrences) == len(expected) assert set(db_occurrences) == set(expected) @pytest.mark.parametrize( "repo, lower, mindepth", ( ("cmdbts2", True, 1), ("cmdbts2", False, 1), ("cmdbts2", True, 2), ("cmdbts2", False, 2), ("out-of-order", True, 1), ), ) def test_provenance_heuristics_content_find_first( provenance, swh_storage, archive, repo, lower, mindepth ): # read data/README.md for more details on how these datasets are generated data = load_repo_data(repo) fill_storage(swh_storage, data) revisions = [ RevisionEntry( id=revision["id"], date=ts2dt(revision["date"]), root=revision["directory"], ) for revision in data["revision"] ] # XXX adding all revisions at once should be working just fine, but it does not... # revision_add(provenance, archive, revisions, lower=lower, mindepth=mindepth) # ...so add revisions one at a time for now for revision in revisions: revision_add(provenance, archive, [revision], lower=lower, mindepth=mindepth) syntheticfile = get_datafile( f"synthetic_{repo}_{'lower' if lower else 'upper'}_{mindepth}.txt" ) expected_first: Dict[str, Tuple[str, str, List[str]]] = {} # dict of tuples (blob_id, rev_id, [path, ...]) the third element for path # is a list because a content can be added at several places in a single # revision, in which case the result of content_find_first() is one of # those path, but we have no guarantee which one it will return. for synth_rev in synthetic_result(syntheticfile): rev_id = synth_rev["sha1"].hex() rev_ts = synth_rev["date"] for rc in synth_rev["R_C"]: sha1 = rc["dst"].hex() if sha1 not in expected_first: assert rc["rel_ts"] == 0 expected_first[sha1] = (rev_id, rev_ts, [rc["path"]]) else: if rev_ts == expected_first[sha1][1]: expected_first[sha1][2].append(rc["path"]) elif rev_ts < expected_first[sha1][1]: expected_first[sha1] = (rev_id, rev_ts, rc["path"]) for dc in synth_rev["D_C"]: sha1 = rc["dst"].hex() assert sha1 in expected_first # nothing to do there, this content cannot be a "first seen file" for content_id, (rev_id, ts, paths) in expected_first.items(): (r_sha1, r_rev_id, r_ts, r_path) = provenance.content_find_first( bytes.fromhex(content_id) ) assert r_sha1.hex() == content_id assert r_rev_id.hex() == rev_id assert r_ts.timestamp() == ts assert r_path.decode() in paths