diff --git a/swh/provenance/cli.py b/swh/provenance/cli.py index 5f1206a..b5b3e0b 100644 --- a/swh/provenance/cli.py +++ b/swh/provenance/cli.py @@ -1,211 +1,215 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information # WARNING: do not import unnecessary things here to keep cli startup time under # control import os from typing import Any, Dict, Optional import click import yaml from swh.core import config from swh.core.cli import CONTEXT_SETTINGS from swh.core.cli import swh as swh_cli_group from swh.model.hashutil import hash_to_bytes, hash_to_hex # All generic config code should reside in swh.core.config CONFIG_ENVVAR = "SWH_CONFIG_FILE" DEFAULT_CONFIG_PATH = os.path.join(click.get_app_dir("swh"), "global.yml") DEFAULT_PATH = os.environ.get(CONFIG_ENVVAR, DEFAULT_CONFIG_PATH) DEFAULT_CONFIG: Dict[str, Any] = { "archive": { "cls": "api", "storage": { "cls": "remote", "url": "http://uffizi.internal.softwareheritage.org:5002", } # "cls": "direct", # "db": { # "host": "db.internal.softwareheritage.org", # "dbname": "softwareheritage", # "user": "guest" # } }, "provenance": {"cls": "local", "db": {"host": "localhost", "dbname": "provenance"}}, } CONFIG_FILE_HELP = f"""Configuration file: \b The CLI option or the environment variable will fail if invalid. CLI option is checked first. Then, environment variable {CONFIG_ENVVAR} is checked. Then, if cannot load the default path, a set of default values are used. Default config path is {DEFAULT_CONFIG_PATH}. Default config values are: \b {yaml.dump(DEFAULT_CONFIG)}""" PROVENANCE_HELP = f"""Software Heritage Scanner tools. {CONFIG_FILE_HELP}""" @swh_cli_group.group( name="provenance", context_settings=CONTEXT_SETTINGS, help=PROVENANCE_HELP ) @click.option( "-C", "--config-file", default=None, type=click.Path(exists=False, dir_okay=False, path_type=str), help="""YAML configuration file.""", ) @click.option( "-P", "--profile", default=None, type=click.Path(exists=False, dir_okay=False, path_type=str), help="""Enable profiling to specified file.""", ) @click.pass_context def cli(ctx, config_file: Optional[str], profile: str): if config_file is None and config.config_exists(DEFAULT_PATH): config_file = DEFAULT_PATH if config_file is None: conf = DEFAULT_CONFIG else: # read_raw_config do not fail on ENOENT if not config.config_exists(config_file): raise FileNotFoundError(config_file) conf = config.read_raw_config(config.config_basepath(config_file)) conf = config.merge_configs(DEFAULT_CONFIG, conf) ctx.ensure_object(dict) ctx.obj["config"] = conf if profile: import atexit import cProfile print("Profiling...") pr = cProfile.Profile() pr.enable() def exit(): pr.disable() pr.dump_stats(profile) atexit.register(exit) @cli.command(name="create", deprecated=True) @click.option("--maintenance-db", default=None) @click.option("--drop/--no-drop", "drop_db", default=False) @click.pass_context def create(ctx, maintenance_db, drop_db): """Deprecated, please use: swh db create provenance and swh db init provenance instead. """ @cli.command(name="iter-revisions") @click.argument("filename") @click.option("-a", "--track-all", default=True, type=bool) @click.option("-l", "--limit", type=int) @click.option("-m", "--min-depth", default=1, type=int) @click.option("-r", "--reuse", default=True, type=bool) @click.pass_context def iter_revisions(ctx, filename, track_all, limit, min_depth, reuse): # TODO: add file size filtering """Process a provided list of revisions.""" from . import get_archive, get_provenance from .provenance import revision_add from .revision import CSVRevisionIterator archive = get_archive(**ctx.obj["config"]["archive"]) provenance = get_provenance(**ctx.obj["config"]["provenance"]) revisions_provider = ( line.strip().split(",") for line in open(filename, "r") if line.strip() ) revisions = CSVRevisionIterator(revisions_provider, limit=limit) for revision in revisions: revision_add( provenance, archive, [revision], trackall=track_all, lower=reuse, mindepth=min_depth, ) @cli.command(name="iter-origins") @click.argument("filename") @click.option("-l", "--limit", type=int) @click.pass_context def iter_origins(ctx, filename, limit): """Process a provided list of origins.""" from . import get_archive, get_provenance - from .origin import FileOriginIterator + from .origin import CSVOriginIterator from .provenance import origin_add archive = get_archive(**ctx.obj["config"]["archive"]) provenance = get_provenance(**ctx.obj["config"]["provenance"]) + origins_provider = ( + line.strip().split(",") for line in open(filename, "r") if line.strip() + ) + origins = CSVOriginIterator(origins_provider, limit=limit) - for origin in FileOriginIterator(filename, archive, limit=limit): - origin_add(archive, provenance, origin) + for origin in origins: + origin_add(provenance, archive, [origin]) @cli.command(name="find-first") @click.argument("swhid") @click.pass_context def find_first(ctx, swhid): """Find first occurrence of the requested blob.""" from . import get_provenance provenance = get_provenance(**ctx.obj["config"]["provenance"]) # TODO: return a dictionary with proper keys for each field row = provenance.content_find_first(hash_to_bytes(swhid)) if row is not None: print( "swh:1:cnt:{cnt}, swh:1:rev:{rev}, {date}, {path}".format( cnt=hash_to_hex(row[0]), rev=hash_to_hex(row[1]), date=row[2], path=os.fsdecode(row[3]), ) ) else: print(f"Cannot find a content with the id {swhid}") @cli.command(name="find-all") @click.argument("swhid") @click.option("-l", "--limit", type=int) @click.pass_context def find_all(ctx, swhid, limit): """Find all occurrences of the requested blob.""" from swh.provenance import get_provenance provenance = get_provenance(**ctx.obj["config"]["provenance"]) # TODO: return a dictionary with proper keys for each field for row in provenance.content_find_all(hash_to_bytes(swhid), limit=limit): print( "swh:1:cnt:{cnt}, swh:1:rev:{rev}, {date}, {path}".format( cnt=hash_to_hex(row[0]), rev=hash_to_hex(row[1]), date=row[2], path=os.fsdecode(row[3]), ) ) diff --git a/swh/provenance/model.py b/swh/provenance/model.py index 3ab0214..57456fd 100644 --- a/swh/provenance/model.py +++ b/swh/provenance/model.py @@ -1,103 +1,151 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime -from typing import Iterable, Iterator, List, Optional +from typing import Iterable, Iterator, List, Optional, Set + +from swh.core.utils import grouper +from swh.model.model import ObjectType, TargetType from .archive import ArchiveInterface class OriginEntry: - def __init__(self, url, revisions: Iterable["RevisionEntry"], id=None): - self.id = id + def __init__( + self, url: str, date: datetime, snapshot: bytes, id: Optional[int] = None + ): self.url = url - self.revisions = revisions + self.date = date + self.snapshot = snapshot + self.id = id + self._revisions: Optional[List[RevisionEntry]] = None + + def retrieve_revisions(self, archive: ArchiveInterface): + if self._revisions is None: + snapshot = archive.snapshot_get_all_branches(self.snapshot) + assert snapshot is not None + targets_set = set() + releases_set = set() + if snapshot is not None: + for branch in snapshot.branches: + if snapshot.branches[branch].target_type == TargetType.REVISION: + targets_set.add(snapshot.branches[branch].target) + elif snapshot.branches[branch].target_type == TargetType.RELEASE: + releases_set.add(snapshot.branches[branch].target) + + batchsize = 100 + for releases in grouper(releases_set, batchsize): + targets_set.update( + release.target + for release in archive.revision_get(releases) + if release is not None + and release.target_type == ObjectType.REVISION + ) + + revisions: Set[RevisionEntry] = set() + for targets in grouper(targets_set, batchsize): + revisions.update( + RevisionEntry(revision.id) + for revision in archive.revision_get(targets) + if revision is not None + ) + + self._revisions = list(revisions) + + @property + def revisions(self) -> Iterator["RevisionEntry"]: + if self._revisions is None: + raise RuntimeError( + "Revisions of this node has not yet been retrieved. " + "Please call retrieve_revisions() before using this property." + ) + return (x for x in self._revisions) class RevisionEntry: def __init__( self, id: bytes, date: Optional[datetime] = None, root: Optional[bytes] = None, parents: Optional[Iterable[bytes]] = None, ): self.id = id self.date = date assert self.date is None or self.date.tzinfo is not None self.root = root self._parents = parents self._nodes: List[RevisionEntry] = [] def parents(self, archive: ArchiveInterface): if self._parents is None: revision = archive.revision_get([self.id]) if revision: - self._parents = revision[0].parents + self._parents = list(revision)[0].parents if self._parents and not self._nodes: self._nodes = [ RevisionEntry( id=rev.id, root=rev.directory, date=rev.date, parents=rev.parents, ) for rev in archive.revision_get(self._parents) if rev ] yield from self._nodes def __str__(self): return f"" class DirectoryEntry: def __init__(self, id: bytes, name: bytes = b""): self.id = id self.name = name self._files: Optional[List[FileEntry]] = None self._dirs: Optional[List[DirectoryEntry]] = None def retrieve_children(self, archive: ArchiveInterface): if self._files is None and self._dirs is None: self._files = [] self._dirs = [] for child in archive.directory_ls(self.id): if child["type"] == "dir": self._dirs.append( DirectoryEntry(child["target"], name=child["name"]) ) elif child["type"] == "file": self._files.append(FileEntry(child["target"], child["name"])) @property def files(self) -> Iterator["FileEntry"]: if self._files is None: raise RuntimeError( "Children of this node has not yet been retrieved. " "Please call retrieve_children() before using this property." ) return (x for x in self._files) @property def dirs(self) -> Iterator["DirectoryEntry"]: if self._dirs is None: raise RuntimeError( "Children of this node has not yet been retrieved. " "Please call retrieve_children() before using this property." ) return (x for x in self._dirs) def __str__(self): return f"" class FileEntry: def __init__(self, id: bytes, name: bytes): self.id = id self.name = name def __str__(self): return f"" diff --git a/swh/provenance/origin.py b/swh/provenance/origin.py index 92c7cb0..82f7709 100644 --- a/swh/provenance/origin.py +++ b/swh/provenance/origin.py @@ -1,91 +1,41 @@ -from typing import List, Optional +from datetime import datetime, timezone +from itertools import islice +from typing import Iterable, Iterator, Optional, Tuple -from swh.model.model import ObjectType, Origin, TargetType +import iso8601 -from .archive import ArchiveInterface -from .model import OriginEntry, RevisionEntry +from .model import OriginEntry ################################################################################ ################################################################################ -class FileOriginIterator: - """Iterator over origins present in the given CSV file.""" +class CSVOriginIterator: + """Iterator over origin visit statuses typically present in the given CSV + file. - def __init__( - self, filename: str, archive: ArchiveInterface, limit: Optional[int] = None - ): - self.file = open(filename) - self.limit = limit - self.archive = archive - - def __iter__(self): - yield from iterate_statuses( - [Origin(url.strip()) for url in self.file], self.archive, self.limit - ) + The input is an iterator that produces 3 elements per row: + (url, date, snap) -class ArchiveOriginIterator: - """Iterator over origins present in the given storage.""" + where: + - url: is the origin url of the visit + - date: is the date of the visit + - snap: sha1_git of the snapshot pointed by the visit status + """ - def __init__(self, archive: ArchiveInterface, limit: Optional[int] = None): - self.limit = limit - self.archive = archive + def __init__( + self, + statuses: Iterable[Tuple[str, datetime, bytes]], + limit: Optional[int] = None, + ): + self.statuses: Iterator[Tuple[str, datetime, bytes]] + if limit is not None: + self.statuses = islice(statuses, limit) + else: + self.statuses = iter(statuses) def __iter__(self): - yield from iterate_statuses( - self.archive.iter_origins(), self.archive, self.limit - ) - - -def iterate_statuses( - origins: List[Origin], archive: ArchiveInterface, limit: Optional[int] = None -): - idx = 0 - for origin in origins: - for visit in archive.iter_origin_visits(origin.url): - for status in archive.iter_origin_visit_statuses(origin.url, visit.visit): - snapshot = archive.snapshot_get_all_branches(status.snapshot) - if snapshot is None: - continue - # TODO: may filter only those whose status is 'full'?? - targets_set = set() - releases_set = set() - if snapshot is not None: - for branch in snapshot.branches: - if snapshot.branches[branch].target_type == TargetType.REVISION: - targets_set.add(snapshot.branches[branch].target) - elif ( - snapshot.branches[branch].target_type == TargetType.RELEASE - ): - releases_set.add(snapshot.branches[branch].target) - - # This is done to keep the query in release_get small, hence avoiding - # a timeout. - batchsize = 100 - while releases_set: - releases = [ - releases_set.pop() for i in range(batchsize) if releases_set - ] - for release in archive.release_get(releases): - if release is not None: - if release.target_type == ObjectType.REVISION: - targets_set.add(release.target) - - # This is done to keep the query in revision_get small, hence avoiding - # a timeout. - revisions = set() - while targets_set: - targets = [ - targets_set.pop() for i in range(batchsize) if targets_set - ] - for revision in archive.revision_get(targets): - if revision is not None: - revisions.add(RevisionEntry(revision.id)) - # target_set |= set(revision.parents) - - yield OriginEntry(status.origin, list(revisions)) - - idx += 1 - if idx == limit: - return + for url, date, snap in self.statuses: + date = iso8601.parse_date(date, default_timezone=timezone.utc) + yield OriginEntry(url, date, snap) diff --git a/swh/provenance/provenance.py b/swh/provenance/provenance.py index d09c70f..6477cca 100644 --- a/swh/provenance/provenance.py +++ b/swh/provenance/provenance.py @@ -1,543 +1,553 @@ from datetime import datetime, timezone import logging import os import time from typing import Dict, Generator, Iterable, List, Optional, Tuple from typing_extensions import Protocol, runtime_checkable from swh.model.hashutil import hash_to_hex from .archive import ArchiveInterface from .model import DirectoryEntry, FileEntry, OriginEntry, RevisionEntry UTCMIN = datetime.min.replace(tzinfo=timezone.utc) @runtime_checkable class ProvenanceInterface(Protocol): raise_on_commit: bool = False def commit(self): """Commit currently ongoing transactions in the backend DB""" ... def content_add_to_directory( self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes ) -> None: ... def content_add_to_revision( self, revision: RevisionEntry, blob: FileEntry, prefix: bytes ) -> None: ... def content_find_first( self, blobid: bytes ) -> Optional[Tuple[bytes, bytes, datetime, bytes]]: ... def content_find_all( self, blobid: bytes, limit: Optional[int] = None ) -> Generator[Tuple[bytes, bytes, datetime, bytes], None, None]: ... def content_get_early_date(self, blob: FileEntry) -> Optional[datetime]: ... def content_get_early_dates( self, blobs: Iterable[FileEntry] ) -> Dict[bytes, datetime]: ... def content_set_early_date(self, blob: FileEntry, date: datetime) -> None: ... def directory_add_to_revision( self, revision: RevisionEntry, directory: DirectoryEntry, path: bytes ) -> None: ... def directory_get_date_in_isochrone_frontier( self, directory: DirectoryEntry ) -> Optional[datetime]: ... def directory_get_dates_in_isochrone_frontier( self, dirs: Iterable[DirectoryEntry] ) -> Dict[bytes, datetime]: ... def directory_invalidate_in_isochrone_frontier( self, directory: DirectoryEntry ) -> None: ... def directory_set_date_in_isochrone_frontier( self, directory: DirectoryEntry, date: datetime ) -> None: ... def origin_get_id(self, origin: OriginEntry) -> int: ... def revision_add(self, revision: RevisionEntry) -> None: ... def revision_add_before_revision( self, relative: RevisionEntry, revision: RevisionEntry ) -> None: ... def revision_add_to_origin( self, origin: OriginEntry, revision: RevisionEntry ) -> None: ... def revision_get_early_date(self, revision: RevisionEntry) -> Optional[datetime]: ... def revision_get_preferred_origin(self, revision: RevisionEntry) -> int: ... def revision_in_history(self, revision: RevisionEntry) -> bool: ... def revision_set_preferred_origin( self, origin: OriginEntry, revision: RevisionEntry ) -> None: ... def revision_visited(self, revision: RevisionEntry) -> bool: ... def flatten_directory( archive: ArchiveInterface, provenance: ProvenanceInterface, directory: DirectoryEntry, ) -> None: """Recursively retrieve all the files of 'directory' and insert them in the 'provenance' database in the 'content_to_directory' table. """ stack = [(directory, b"")] while stack: current, prefix = stack.pop() current.retrieve_children(archive) for f_child in current.files: # Add content to the directory with the computed prefix. provenance.content_add_to_directory(directory, f_child, prefix) for d_child in current.dirs: # Recursively walk the child directory. stack.append((d_child, os.path.join(prefix, d_child.name))) def origin_add( - archive: ArchiveInterface, provenance: ProvenanceInterface, origin: OriginEntry + provenance: ProvenanceInterface, + archive: ArchiveInterface, + origins: List[OriginEntry], ) -> None: - # TODO: refactor to iterate over origin visit statuses and commit only once - # per status. - origin.id = provenance.origin_get_id(origin) - for revision in origin.revisions: - origin_add_revision(archive, provenance, origin, revision) - # Commit after each revision - provenance.commit() # TODO: verify this! + start = time.time() + for origin in origins: + origin.retrieve_revisions(archive) + for revision in origin.revisions: + origin_add_revision(provenance, archive, origin, revision) + done = time.time() + provenance.commit() + stop = time.time() + logging.debug( + "Origins " + ";".join( + [origin.url + ":" + hash_to_hex(origin.snapshot) for origin in origins] + ) + + f" were processed in {stop - start} secs (commit took {stop - done} secs)!" + ) def origin_add_revision( - archive: ArchiveInterface, provenance: ProvenanceInterface, + archive: ArchiveInterface, origin: OriginEntry, revision: RevisionEntry, ) -> None: stack: List[Tuple[Optional[RevisionEntry], RevisionEntry]] = [(None, revision)] while stack: relative, current = stack.pop() # Check if current revision has no preferred origin and update if necessary. preferred = provenance.revision_get_preferred_origin(current) if preferred is None: provenance.revision_set_preferred_origin(origin, current) ######################################################################## if relative is None: # This revision is pointed directly by the origin. visited = provenance.revision_visited(current) provenance.revision_add_to_origin(origin, current) if not visited: stack.append((current, current)) else: # This revision is a parent of another one in the history of the # relative revision. for parent in current.parents(archive): visited = provenance.revision_visited(parent) if not visited: # The parent revision has never been seen before pointing # directly to an origin. known = provenance.revision_in_history(parent) if known: # The parent revision is already known in some other # revision's history. We should point it directly to # the origin and (eventually) walk its history. stack.append((None, parent)) else: # The parent revision was never seen before. We should # walk its history and associate it with the same # relative revision. provenance.revision_add_before_revision(relative, parent) stack.append((relative, parent)) else: # The parent revision already points to an origin, so its # history was properly processed before. We just need to # make sure it points to the current origin as well. provenance.revision_add_to_origin(origin, parent) def revision_add( provenance: ProvenanceInterface, archive: ArchiveInterface, revisions: List[RevisionEntry], trackall: bool = True, lower: bool = True, mindepth: int = 1, ) -> None: start = time.time() for revision in revisions: assert revision.date is not None assert revision.root is not None # Processed content starting from the revision's root directory. date = provenance.revision_get_early_date(revision) if date is None or revision.date < date: logging.debug( f"Processing revisions {hash_to_hex(revision.id)}" f" (known date {date} / revision date {revision.date})..." ) graph = build_isochrone_graph( archive, provenance, revision, DirectoryEntry(revision.root), ) # TODO: add file size filtering revision_process_content( archive, provenance, revision, graph, trackall=trackall, lower=lower, mindepth=mindepth, ) done = time.time() # TODO: improve this! Maybe using a max attempt counter? # Ideally Provenance class should guarantee that a commit never fails. while not provenance.commit(): logging.warning( "Could not commit revisions " + ";".join([hash_to_hex(revision.id) for revision in revisions]) + ". Retrying..." ) stop = time.time() logging.debug( f"Revisions {';'.join([hash_to_hex(revision.id) for revision in revisions])} " f" were processed in {stop - start} secs (commit took {stop - done} secs)!" ) # logging.critical( # ";".join([hash_to_hex(revision.id) for revision in revisions]) # + f",{stop - start},{stop - done}" # ) class IsochroneNode: def __init__( self, entry: DirectoryEntry, dbdate: Optional[datetime] = None, depth: int = 0, prefix: bytes = b"", ): self.entry = entry self.depth = depth # dbdate is the maxdate for this node that comes from the DB self._dbdate: Optional[datetime] = dbdate # maxdate is set by the maxdate computation algorithm self.maxdate: Optional[datetime] = None # known is True if this node is already known in the db; either because # the current directory actually exists in the database, or because all # the content of the current directory is known (subdirectories and files) self.known: bool = self.dbdate is not None self.path = os.path.join(prefix, self.entry.name) if prefix else self.entry.name self.children: List[IsochroneNode] = [] def __str__(self): return ( f"<{self.entry.__class__.__name__}[{self.entry.name}]: " f"known={self.known}, maxdate={self.maxdate}, dbdate={self.dbdate}>" ) @property def dbdate(self): # use a property to make this attribute (mostly) read-only return self._dbdate def invalidate(self): self._dbdate = None self.maxdate = None self.known = False def add_directory( self, child: DirectoryEntry, date: Optional[datetime] = None ) -> "IsochroneNode": # we should not be processing this node (ie add subdirectories or # files) if it's actually known by the provenance DB assert self.dbdate is None node = IsochroneNode(child, dbdate=date, depth=self.depth + 1, prefix=self.path) self.children.append(node) return node def build_isochrone_graph( archive: ArchiveInterface, provenance: ProvenanceInterface, revision: RevisionEntry, directory: DirectoryEntry, ) -> IsochroneNode: assert revision.date is not None assert revision.root == directory.id # this function process a revision in 2 steps: # # 1. build the tree structure of IsochroneNode objects (one INode per # directory under the root directory of the revision but not following # known subdirectories), and gather the dates from the DB for already # known objects; for files, just keep all the dates in a global 'fdates' # dict; note that in this step, we will only recurse the directories # that are not already known. # # 2. compute the maxdate for each node of the tree that was not found in the DB. # Build the nodes structure root_date = provenance.directory_get_date_in_isochrone_frontier(directory) root = IsochroneNode(directory, dbdate=root_date) stack = [root] logging.debug( f"Recursively creating graph for revision {hash_to_hex(revision.id)}..." ) fdates: Dict[bytes, datetime] = {} # map {file_id: date} while stack: current = stack.pop() if current.dbdate is None or current.dbdate > revision.date: # If current directory has an associated date in the isochrone frontier that # is greater or equal to the current revision's one, it should be ignored as # the revision is being processed out of order. if current.dbdate is not None and current.dbdate > revision.date: logging.debug( f"Invalidating frontier on {hash_to_hex(current.entry.id)}" f" (date {current.dbdate})" f" when processing revision {hash_to_hex(revision.id)}" f" (date {revision.date})" ) provenance.directory_invalidate_in_isochrone_frontier(current.entry) current.invalidate() # Pre-query all known dates for directories in the current directory # for the provenance object to have them cached and (potentially) improve # performance. current.entry.retrieve_children(archive) ddates = provenance.directory_get_dates_in_isochrone_frontier( current.entry.dirs ) for dir in current.entry.dirs: # Recursively analyse subdirectory nodes node = current.add_directory(dir, date=ddates.get(dir.id, None)) stack.append(node) fdates.update(provenance.content_get_early_dates(current.entry.files)) logging.debug( f"Isochrone graph for revision {hash_to_hex(revision.id)} successfully created!" ) # Precalculate max known date for each node in the graph (only directory nodes are # pushed to the stack). logging.debug(f"Computing maxdates for revision {hash_to_hex(revision.id)}...") stack = [root] while stack: current = stack.pop() # Current directory node is known if it already has an assigned date (ie. it was # already seen as an isochrone frontier). if current.known: assert current.maxdate is None current.maxdate = current.dbdate else: if any(x.maxdate is None for x in current.children): # at least one child of current has no maxdate yet # Current node needs to be analysed again after its children. stack.append(current) for child in current.children: if child.maxdate is None: # if child.maxdate is None, it must be processed stack.append(child) else: # all the files and directories under current have a maxdate, # we can infer the maxdate for current directory assert current.maxdate is None # If all content is already known, update current directory info. current.maxdate = max( [UTCMIN] + [ child.maxdate for child in current.children if child.maxdate is not None # unnecessary, but needed for mypy ] + [ fdates.get(file.id, revision.date) for file in current.entry.files ] ) current.known = ( # true if all subdirectories are known all(child.known for child in current.children) # true if all files are in fdates, i.e. if all files were known # *before building this isochrone graph node* # Note: the 'all()' is lazy: will stop iterating as soon as possible and all((file.id in fdates) for file in current.entry.files) ) logging.debug( f"Maxdates for revision {hash_to_hex(revision.id)} successfully computed!" ) return root def revision_process_content( archive: ArchiveInterface, provenance: ProvenanceInterface, revision: RevisionEntry, graph: IsochroneNode, trackall: bool = True, lower: bool = True, mindepth: int = 1, ): assert revision.date is not None provenance.revision_add(revision) stack = [graph] while stack: current = stack.pop() if current.dbdate is not None: assert current.dbdate <= revision.date if trackall: # Current directory is an outer isochrone frontier for a previously # processed revision. It should be reused as is. provenance.directory_add_to_revision( revision, current.entry, current.path ) else: # Current directory is not an outer isochrone frontier for any previous # revision. It might be eligible for this one. if is_new_frontier( current, revision=revision, trackall=trackall, lower=lower, mindepth=mindepth, ): assert current.maxdate is not None # Outer frontier should be moved to current position in the isochrone # graph. This is the first time this directory is found in the isochrone # frontier. provenance.directory_set_date_in_isochrone_frontier( current.entry, current.maxdate ) if trackall: provenance.directory_add_to_revision( revision, current.entry, current.path ) flatten_directory(archive, provenance, current.entry) else: # No point moving the frontier here. Either there are no files or they # are being seen for the first time here. Add all blobs to current # revision updating date if necessary, and recursively analyse # subdirectories as candidates to the outer frontier. for blob in current.entry.files: date = provenance.content_get_early_date(blob) if date is None or revision.date < date: provenance.content_set_early_date(blob, revision.date) provenance.content_add_to_revision(revision, blob, current.path) for child in current.children: stack.append(child) def is_new_frontier( node: IsochroneNode, revision: RevisionEntry, trackall: bool = True, lower: bool = True, mindepth: int = 1, ) -> bool: assert node.maxdate is not None # for mypy assert revision.date is not None # idem if trackall: # The only real condition for a directory to be a frontier is that its # content is already known and its maxdate is less (or equal) than # current revision's date. Checking mindepth is meant to skip root # directories (or any arbitrary depth) to improve the result. The # option lower tries to maximize the reusage rate of previously defined # frontiers by keeping them low in the directory tree. return ( node.known and node.maxdate <= revision.date # all content is earlier than revision and node.depth >= mindepth # current node is deeper than the min allowed depth and (has_blobs(node) if lower else True) # there is at least one blob in it ) else: # If we are only tracking first occurrences, we want to ensure that all first # occurrences end up in the content_early_in_rev relation. Thus, we force for # every blob outside a frontier to have an extrictly earlier date. return ( node.maxdate < revision.date # all content is earlier than revision and node.depth >= mindepth # deeper than the min allowed depth and (has_blobs(node) if lower else True) # there is at least one blob ) def has_blobs(node: IsochroneNode) -> bool: # We may want to look for files in different ways to decide whether to define a # frontier or not: # 1. Only files in current node: return any(node.entry.files) # 2. Files anywhere in the isochrone graph # stack = [node] # while stack: # current = stack.pop() # if any( # map(lambda child: isinstance(child.entry, FileEntry), current.children)): # return True # else: # # All children are directory entries. # stack.extend(current.children) # return False # 3. Files in the intermediate directories between current node and any previously # defined frontier: # TODO: complete this case! # return any( # map(lambda child: isinstance(child.entry, FileEntry), node.children) # ) or all( # map( # lambda child: ( # not (isinstance(child.entry, DirectoryEntry) and child.date is None) # ) # or has_blobs(child), # node.children, # ) # ) diff --git a/swh/provenance/tests/test_origin_iterator.py b/swh/provenance/tests/test_origin_iterator.py index 21cd9d1..724db34 100644 --- a/swh/provenance/tests/test_origin_iterator.py +++ b/swh/provenance/tests/test_origin_iterator.py @@ -1,24 +1,37 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import pytest from swh.model.tests.swh_model_data import TEST_OBJECTS -from swh.provenance.origin import ArchiveOriginIterator +from swh.provenance.origin import CSVOriginIterator +from swh.storage.algos.origin import ( + iter_origins, + iter_origin_visits, + iter_origin_visit_statuses, +) -def test_archive_direct_origin_iterator(swh_storage_with_objects, archive_direct): - """Test ArchiveOriginIterator against the ArchivePostgreSQL""" - # XXX - pytest.xfail("Iterate Origins is currently unsupported by ArchivePostgreSQL") - origins = list(ArchiveOriginIterator(archive_direct)) +def test_origin_iterator(swh_storage_with_objects): + """Test CSVOriginIterator""" + origins_csv = [] + for origin in iter_origins(swh_storage_with_objects): + for visit in iter_origin_visits(swh_storage_with_objects, origin.url): + for status in iter_origin_visit_statuses( + swh_storage_with_objects, origin.url, visit.visit + ): + if status.snapshot is not None: + origins_csv.append( + (status.origin, status.date.isoformat(), status.snapshot) + ) + origins = list(CSVOriginIterator(origins_csv)) assert origins - assert len(origins) == len(TEST_OBJECTS["origin"]) - - -def test_archive_api_origin_iterator(swh_storage_with_objects, archive_api): - """Test ArchiveOriginIterator against the ArchiveStorage""" - origins = list(ArchiveOriginIterator(archive_api)) - assert origins - assert len(origins) == len(TEST_OBJECTS["origin"]) + assert len(origins) == len( + list( + { + status.origin + for status in TEST_OBJECTS["origin_visit_status"] + if status.snapshot is not None + } + ) + ) diff --git a/swh/provenance/tests/test_provenance_db.py b/swh/provenance/tests/test_provenance_db.py index efab8e5..f98d059 100644 --- a/swh/provenance/tests/test_provenance_db.py +++ b/swh/provenance/tests/test_provenance_db.py @@ -1,27 +1,31 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime from swh.model.tests.swh_model_data import TEST_OBJECTS from swh.provenance.origin import OriginEntry from swh.provenance.provenance import origin_add from swh.provenance.storage.archive import ArchiveStorage def ts2dt(ts: dict) -> datetime.datetime: timestamp = datetime.datetime.fromtimestamp( ts["timestamp"]["seconds"], datetime.timezone(datetime.timedelta(minutes=ts["offset"])), ) return timestamp.replace(microsecond=ts["timestamp"]["microseconds"]) def test_provenance_origin_add(provenance, swh_storage_with_objects): - """Test the ProvenanceDB.origin_add() method""" - for origin in TEST_OBJECTS["origin"]: - entry = OriginEntry(url=origin.url, revisions=[]) - origin_add(ArchiveStorage(swh_storage_with_objects), provenance, entry) + """Test the origin_add function""" + archive = ArchiveStorage(swh_storage_with_objects) + for status in TEST_OBJECTS["origin_visit_status"]: + if status.snapshot is not None: + entry = OriginEntry( + url=status.origin, date=status.date, snapshot=status.snapshot + ) + origin_add(provenance, archive, [entry]) # TODO: check some facts here