diff --git a/swh/provenance/__init__.py b/swh/provenance/__init__.py index 8fa2a90..ab025dc 100644 --- a/swh/provenance/__init__.py +++ b/swh/provenance/__init__.py @@ -1,48 +1,83 @@ +from __future__ import annotations + from typing import TYPE_CHECKING if TYPE_CHECKING: from .archive import ArchiveInterface from .provenance import ProvenanceInterface, ProvenanceStorageInterface -def get_archive(cls: str, **kwargs) -> "ArchiveInterface": +def get_archive(cls: str, **kwargs) -> ArchiveInterface: + """Get an archive object of class ``cls`` with arguments ``args``. + + Args: + cls: archive's class, either 'api' or 'direct' + args: dictionary of arguments passed to the archive class constructor + + Returns: + an instance of archive object (either using swh.storage API or direct + queries to the archive's database) + + Raises: + :cls:`ValueError` if passed an unknown archive class. + """ if cls == "api": from swh.storage import get_storage from .storage.archive import ArchiveStorage return ArchiveStorage(get_storage(**kwargs["storage"])) elif cls == "direct": from swh.core.db import BaseDb from .postgresql.archive import ArchivePostgreSQL return ArchivePostgreSQL(BaseDb.connect(**kwargs["db"]).conn) else: - raise NotImplementedError + raise ValueError + +def get_provenance(**kwargs) -> ProvenanceInterface: + """Get an provenance object with arguments ``args``. -def get_provenance(**kwargs) -> "ProvenanceInterface": + Args: + args: dictionary of arguments to retrieve a swh.provenance.storage + class (see :func:`get_provenance_storage` for details) + + Returns: + an instance of provenance object + """ from .backend import ProvenanceBackend return ProvenanceBackend(get_provenance_storage(**kwargs)) -def get_provenance_storage(cls: str, **kwargs) -> "ProvenanceStorageInterface": +def get_provenance_storage(cls: str, **kwargs) -> ProvenanceStorageInterface: + """Get an archive object of class ``cls`` with arguments ``args``. + + Args: + cls: storage's class, only 'local' is currently supported + args: dictionary of arguments passed to the storage class constructor + + Returns: + an instance of storage object + + Raises: + :cls:`ValueError` if passed an unknown archive class. + """ if cls == "local": from swh.core.db import BaseDb from .postgresql.provenancedb_base import ProvenanceDBBase conn = BaseDb.connect(**kwargs["db"]).conn - flavor = ProvenanceDBBase(conn).flavor - if flavor == "with-path": + if ProvenanceDBBase(conn).flavor == "with-path": from .postgresql.provenancedb_with_path import ProvenanceWithPathDB return ProvenanceWithPathDB(conn) else: from .postgresql.provenancedb_without_path import ProvenanceWithoutPathDB return ProvenanceWithoutPathDB(conn) else: - raise NotImplementedError + raise ValueError diff --git a/swh/provenance/backend.py b/swh/provenance/backend.py index fa1b781..788091a 100644 --- a/swh/provenance/backend.py +++ b/swh/provenance/backend.py @@ -1,324 +1,324 @@ from datetime import datetime import logging import os from typing import Dict, Generator, Iterable, Optional, Set, Tuple from typing_extensions import Literal, TypedDict from swh.model.model import Sha1Git from .model import DirectoryEntry, FileEntry, OriginEntry, RevisionEntry from .provenance import ProvenanceResult, ProvenanceStorageInterface, RelationType class DatetimeCache(TypedDict): data: Dict[Sha1Git, Optional[datetime]] added: Set[Sha1Git] class OriginCache(TypedDict): data: Dict[Sha1Git, str] added: Set[Sha1Git] class RevisionCache(TypedDict): data: Dict[Sha1Git, Sha1Git] added: Set[Sha1Git] class ProvenanceCache(TypedDict): content: DatetimeCache directory: DatetimeCache revision: DatetimeCache # below are insertion caches only content_in_revision: Set[Tuple[Sha1Git, Sha1Git, bytes]] content_in_directory: Set[Tuple[Sha1Git, Sha1Git, bytes]] directory_in_revision: Set[Tuple[Sha1Git, Sha1Git, bytes]] # these two are for the origin layer origin: OriginCache revision_origin: RevisionCache revision_before_revision: Dict[Sha1Git, Set[Sha1Git]] revision_in_origin: Set[Tuple[Sha1Git, Sha1Git]] def new_cache() -> ProvenanceCache: return ProvenanceCache( content=DatetimeCache(data={}, added=set()), directory=DatetimeCache(data={}, added=set()), revision=DatetimeCache(data={}, added=set()), content_in_revision=set(), content_in_directory=set(), directory_in_revision=set(), origin=OriginCache(data={}, added=set()), revision_origin=RevisionCache(data={}, added=set()), revision_before_revision={}, revision_in_origin=set(), ) # TODO: maybe move this to a separate file class ProvenanceBackend: - def __init__(self, storage: ProvenanceStorageInterface): + def __init__(self, storage: ProvenanceStorageInterface) -> None: self.storage = storage self.cache = new_cache() def clear_caches(self) -> None: self.cache = new_cache() def flush(self) -> None: # Revision-content layer insertions ############################################ # For this layer, relations need to be inserted first so that, in case of # failure, reprocessing the input does not generated an inconsistent database. while not self.storage.relation_add( RelationType.CNT_EARLY_IN_REV, self.cache["content_in_revision"] ): logging.warning( f"Unable to write {RelationType.CNT_EARLY_IN_REV} rows to the storage. " f"Data: {self.cache['content_in_revision']}. Retrying..." ) while not self.storage.relation_add( RelationType.CNT_IN_DIR, self.cache["content_in_directory"] ): logging.warning( f"Unable to write {RelationType.CNT_IN_DIR} rows to the storage. " f"Data: {self.cache['content_in_directory']}. Retrying..." ) while not self.storage.relation_add( RelationType.DIR_IN_REV, self.cache["directory_in_revision"] ): logging.warning( f"Unable to write {RelationType.DIR_IN_REV} rows to the storage. " f"Data: {self.cache['directory_in_revision']}. Retrying..." ) # After relations, dates for the entities can be safely set, acknowledging that # these entities won't need to be reprocessed in case of failure. dates = { sha1: date for sha1, date in self.cache["content"]["data"].items() if sha1 in self.cache["content"]["added"] and date is not None } while not self.storage.content_set_date(dates): logging.warning( f"Unable to write content dates to the storage. " f"Data: {dates}. Retrying..." ) dates = { sha1: date for sha1, date in self.cache["directory"]["data"].items() if sha1 in self.cache["directory"]["added"] and date is not None } while not self.storage.directory_set_date(dates): logging.warning( f"Unable to write directory dates to the storage. " f"Data: {dates}. Retrying..." ) dates = { sha1: date for sha1, date in self.cache["revision"]["data"].items() if sha1 in self.cache["revision"]["added"] and date is not None } while not self.storage.revision_set_date(dates): logging.warning( f"Unable to write revision dates to the storage. " f"Data: {dates}. Retrying..." ) # Origin-revision layer insertions ############################################# # Origins urls should be inserted first so that internal ids' resolution works # properly. urls = { sha1: date for sha1, date in self.cache["origin"]["data"].items() if sha1 in self.cache["origin"]["added"] } while not self.storage.origin_set_url(urls): logging.warning( f"Unable to write origins urls to the storage. " f"Data: {urls}. Retrying..." ) # Second, flat models for revisions' histories (ie. revision-before-revision). rbr_data: Iterable[Tuple[Sha1Git, Sha1Git, Optional[bytes]]] = sum( [ [ (prev, next, None) for next in self.cache["revision_before_revision"][prev] ] for prev in self.cache["revision_before_revision"] ], [], ) while not self.storage.relation_add(RelationType.REV_BEFORE_REV, rbr_data): logging.warning( f"Unable to write {RelationType.REV_BEFORE_REV} rows to the storage. " f"Data: {rbr_data}. Retrying..." ) # Heads (ie. revision-in-origin entries) should be inserted once flat models for # their histories were already added. This is to guarantee consistent results if # something needs to be reprocessed due to a failure: already inserted heads # won't get reprocessed in such a case. rio_data = [(rev, org, None) for rev, org in self.cache["revision_in_origin"]] while not self.storage.relation_add(RelationType.REV_IN_ORG, rio_data): logging.warning( f"Unable to write {RelationType.REV_IN_ORG} rows to the storage. " f"Data: {rio_data}. Retrying..." ) # Finally, preferred origins for the visited revisions are set (this step can be # reordered if required). origins = { sha1: self.cache["revision_origin"]["data"][sha1] for sha1 in self.cache["revision_origin"]["added"] } while not self.storage.revision_set_origin(origins): logging.warning( f"Unable to write preferred origins to the storage. " f"Data: {origins}. Retrying..." ) # clear local cache ############################################################ self.clear_caches() def content_add_to_directory( self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes ) -> None: self.cache["content_in_directory"].add( (blob.id, directory.id, normalize(os.path.join(prefix, blob.name))) ) def content_add_to_revision( self, revision: RevisionEntry, blob: FileEntry, prefix: bytes ) -> None: self.cache["content_in_revision"].add( (blob.id, revision.id, normalize(os.path.join(prefix, blob.name))) ) def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]: return self.storage.content_find_first(id) def content_find_all( self, id: Sha1Git, limit: Optional[int] = None ) -> Generator[ProvenanceResult, None, None]: yield from self.storage.content_find_all(id, limit=limit) def content_get_early_date(self, blob: FileEntry) -> Optional[datetime]: return self.get_dates("content", [blob.id]).get(blob.id) def content_get_early_dates( self, blobs: Iterable[FileEntry] ) -> Dict[Sha1Git, datetime]: return self.get_dates("content", [blob.id for blob in blobs]) def content_set_early_date(self, blob: FileEntry, date: datetime) -> None: self.cache["content"]["data"][blob.id] = date self.cache["content"]["added"].add(blob.id) def directory_add_to_revision( self, revision: RevisionEntry, directory: DirectoryEntry, path: bytes ) -> None: self.cache["directory_in_revision"].add( (directory.id, revision.id, normalize(path)) ) def directory_get_date_in_isochrone_frontier( self, directory: DirectoryEntry ) -> Optional[datetime]: return self.get_dates("directory", [directory.id]).get(directory.id) def directory_get_dates_in_isochrone_frontier( self, dirs: Iterable[DirectoryEntry] ) -> Dict[Sha1Git, datetime]: return self.get_dates("directory", [directory.id for directory in dirs]) def directory_set_date_in_isochrone_frontier( self, directory: DirectoryEntry, date: datetime ) -> None: self.cache["directory"]["data"][directory.id] = date self.cache["directory"]["added"].add(directory.id) def get_dates( self, entity: Literal["content", "directory", "revision"], ids: Iterable[Sha1Git], ) -> Dict[Sha1Git, datetime]: cache = self.cache[entity] missing_ids = set(id for id in ids if id not in cache) if missing_ids: if entity == "revision": updated = { id: date for id, (date, _) in self.storage.revision_get(missing_ids).items() if date is not None } else: updated = getattr(self.storage, f"{entity}_get")(missing_ids) cache["data"].update(updated) dates: Dict[Sha1Git, datetime] = {} for sha1 in ids: date = cache["data"].get(sha1) if date is not None: dates[sha1] = date return dates def origin_add(self, origin: OriginEntry) -> None: self.cache["origin"]["data"][origin.id] = origin.url self.cache["origin"]["added"].add(origin.id) def revision_add(self, revision: RevisionEntry) -> None: self.cache["revision"]["data"][revision.id] = revision.date self.cache["revision"]["added"].add(revision.id) def revision_add_before_revision( self, head: RevisionEntry, revision: RevisionEntry ) -> None: self.cache["revision_before_revision"].setdefault(revision.id, set()).add( head.id ) def revision_add_to_origin( self, origin: OriginEntry, revision: RevisionEntry ) -> None: self.cache["revision_in_origin"].add((revision.id, origin.id)) def revision_get_date(self, revision: RevisionEntry) -> Optional[datetime]: return self.get_dates("revision", [revision.id]).get(revision.id) def revision_get_preferred_origin( self, revision: RevisionEntry ) -> Optional[Sha1Git]: cache = self.cache["revision_origin"]["data"] if revision.id not in cache: ret = self.storage.revision_get([revision.id]) if revision.id in ret: origin = ret[revision.id][1] # TODO: make this not a tuple if origin is not None: cache[revision.id] = origin return cache.get(revision.id) def revision_in_history(self, revision: RevisionEntry) -> bool: return revision.id in self.cache["revision_before_revision"] or bool( self.storage.relation_get(RelationType.REV_BEFORE_REV, [revision.id]) ) def revision_set_preferred_origin( self, origin: OriginEntry, revision: RevisionEntry ) -> None: self.cache["revision_origin"]["data"][revision.id] = origin.id self.cache["revision_origin"]["added"].add(revision.id) def revision_visited(self, revision: RevisionEntry) -> bool: return revision.id in dict(self.cache["revision_in_origin"]) or bool( self.storage.relation_get(RelationType.REV_IN_ORG, [revision.id]) ) def normalize(path: bytes) -> bytes: return path[2:] if path.startswith(bytes("." + os.path.sep, "utf-8")) else path diff --git a/swh/provenance/cli.py b/swh/provenance/cli.py index 704260b..cd3fb59 100644 --- a/swh/provenance/cli.py +++ b/swh/provenance/cli.py @@ -1,224 +1,224 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information # WARNING: do not import unnecessary things here to keep cli startup time under # control from datetime import datetime, timezone import os from typing import Any, Dict, Generator, Optional, Tuple import click import iso8601 import yaml from swh.core import config from swh.core.cli import CONTEXT_SETTINGS from swh.core.cli import swh as swh_cli_group from swh.model.hashutil import hash_to_bytes, hash_to_hex from swh.model.model import Sha1Git # All generic config code should reside in swh.core.config CONFIG_ENVVAR = "SWH_CONFIG_FILE" DEFAULT_CONFIG_PATH = os.path.join(click.get_app_dir("swh"), "global.yml") DEFAULT_PATH = os.environ.get(CONFIG_ENVVAR, DEFAULT_CONFIG_PATH) DEFAULT_CONFIG: Dict[str, Any] = { "archive": { "cls": "api", "storage": { "cls": "remote", "url": "http://uffizi.internal.softwareheritage.org:5002", } # "cls": "direct", # "db": { # "host": "db.internal.softwareheritage.org", # "dbname": "softwareheritage", # "user": "guest" # } }, "provenance": {"cls": "local", "db": {"host": "localhost", "dbname": "provenance"}}, } CONFIG_FILE_HELP = f"""Configuration file: \b The CLI option or the environment variable will fail if invalid. CLI option is checked first. Then, environment variable {CONFIG_ENVVAR} is checked. Then, if cannot load the default path, a set of default values are used. Default config path is {DEFAULT_CONFIG_PATH}. Default config values are: \b {yaml.dump(DEFAULT_CONFIG)}""" PROVENANCE_HELP = f"""Software Heritage Scanner tools. {CONFIG_FILE_HELP}""" @swh_cli_group.group( name="provenance", context_settings=CONTEXT_SETTINGS, help=PROVENANCE_HELP ) @click.option( "-C", "--config-file", default=None, type=click.Path(exists=False, dir_okay=False, path_type=str), help="""YAML configuration file.""", ) @click.option( "-P", "--profile", default=None, type=click.Path(exists=False, dir_okay=False, path_type=str), help="""Enable profiling to specified file.""", ) @click.pass_context -def cli(ctx, config_file: Optional[str], profile: str) -> None: +def cli(ctx: click.core.Context, config_file: Optional[str], profile: str) -> None: if config_file is None and config.config_exists(DEFAULT_PATH): config_file = DEFAULT_PATH if config_file is None: conf = DEFAULT_CONFIG else: # read_raw_config do not fail on ENOENT if not config.config_exists(config_file): raise FileNotFoundError(config_file) conf = config.read_raw_config(config.config_basepath(config_file)) conf = config.merge_configs(DEFAULT_CONFIG, conf) ctx.ensure_object(dict) ctx.obj["config"] = conf if profile: import atexit import cProfile print("Profiling...") pr = cProfile.Profile() pr.enable() def exit() -> None: pr.disable() pr.dump_stats(profile) atexit.register(exit) @cli.command(name="iter-revisions") @click.argument("filename") @click.option("-a", "--track-all", default=True, type=bool) @click.option("-l", "--limit", type=int) @click.option("-m", "--min-depth", default=1, type=int) @click.option("-r", "--reuse", default=True, type=bool) @click.pass_context def iter_revisions( - ctx, + ctx: click.core.Context, filename: str, track_all: bool, limit: Optional[int], min_depth: int, reuse: bool, ) -> None: # TODO: add file size filtering """Process a provided list of revisions.""" from . import get_archive, get_provenance from .revision import CSVRevisionIterator, revision_add archive = get_archive(**ctx.obj["config"]["archive"]) provenance = get_provenance(**ctx.obj["config"]["provenance"]) revisions_provider = generate_revision_tuples(filename) revisions = CSVRevisionIterator(revisions_provider, limit=limit) for revision in revisions: revision_add( provenance, archive, [revision], trackall=track_all, lower=reuse, mindepth=min_depth, ) def generate_revision_tuples( filename: str, ) -> Generator[Tuple[Sha1Git, datetime, Sha1Git], None, None]: for line in open(filename, "r"): if line.strip(): revision, date, root = line.strip().split(",") yield ( hash_to_bytes(revision), iso8601.parse_date(date, default_timezone=timezone.utc), hash_to_bytes(root), ) @cli.command(name="iter-origins") @click.argument("filename") @click.option("-l", "--limit", type=int) @click.pass_context -def iter_origins(ctx, filename: str, limit: Optional[int]) -> None: +def iter_origins(ctx: click.core.Context, filename: str, limit: Optional[int]) -> None: """Process a provided list of origins.""" from . import get_archive, get_provenance from .origin import CSVOriginIterator, origin_add archive = get_archive(**ctx.obj["config"]["archive"]) provenance = get_provenance(**ctx.obj["config"]["provenance"]) origins_provider = generate_origin_tuples(filename) origins = CSVOriginIterator(origins_provider, limit=limit) for origin in origins: origin_add(provenance, archive, [origin]) def generate_origin_tuples(filename: str) -> Generator[Tuple[str, bytes], None, None]: for line in open(filename, "r"): if line.strip(): url, snapshot = line.strip().split(",") yield (url, hash_to_bytes(snapshot)) @cli.command(name="find-first") @click.argument("swhid") @click.pass_context -def find_first(ctx, swhid: str) -> None: +def find_first(ctx: click.core.Context, swhid: str) -> None: """Find first occurrence of the requested blob.""" from . import get_provenance provenance = get_provenance(**ctx.obj["config"]["provenance"]) # TODO: return a dictionary with proper keys for each field occur = provenance.content_find_first(hash_to_bytes(swhid)) if occur is not None: print( f"swh:1:cnt:{hash_to_hex(occur.content)}, " f"swh:1:rev:{hash_to_hex(occur.revision)}, " f"{occur.date}, " f"{occur.origin}, " f"{os.fsdecode(occur.path)}" ) else: print(f"Cannot find a content with the id {swhid}") @cli.command(name="find-all") @click.argument("swhid") @click.option("-l", "--limit", type=int) @click.pass_context -def find_all(ctx, swhid: str, limit: Optional[int]) -> None: +def find_all(ctx: click.core.Context, swhid: str, limit: Optional[int]) -> None: """Find all occurrences of the requested blob.""" from . import get_provenance provenance = get_provenance(**ctx.obj["config"]["provenance"]) # TODO: return a dictionary with proper keys for each field for occur in provenance.content_find_all(hash_to_bytes(swhid), limit=limit): print( f"swh:1:cnt:{hash_to_hex(occur.content)}, " f"swh:1:rev:{hash_to_hex(occur.revision)}, " f"{occur.date}, " f"{occur.origin}, " f"{os.fsdecode(occur.path)}" ) diff --git a/swh/provenance/graph.py b/swh/provenance/graph.py index 0eb15a7..1d782cd 100644 --- a/swh/provenance/graph.py +++ b/swh/provenance/graph.py @@ -1,258 +1,260 @@ +from __future__ import annotations + from datetime import datetime, timezone import logging import os -from typing import Dict, Optional, Set +from typing import Any, Dict, Optional, Set from swh.model.model import Sha1Git from .archive import ArchiveInterface from .model import DirectoryEntry, RevisionEntry from .provenance import ProvenanceInterface UTCMIN = datetime.min.replace(tzinfo=timezone.utc) class HistoryNode: def __init__( self, entry: RevisionEntry, visited: bool = False, in_history: bool = False - ): + ) -> None: self.entry = entry # A revision is `visited` if it is directly pointed by an origin (ie. a head # revision for some snapshot) self.visited = visited # A revision is `in_history` if it appears in the history graph of an already # processed revision in the provenance database self.in_history = in_history self.parents: Set[HistoryNode] = set() def add_parent( self, parent: RevisionEntry, visited: bool = False, in_history: bool = False - ) -> "HistoryNode": + ) -> HistoryNode: node = HistoryNode(parent, visited=visited, in_history=in_history) self.parents.add(node) return node - def __str__(self): + def __str__(self) -> str: return ( f"<{self.entry}: visited={self.visited}, in_history={self.in_history}, " f"parents=[{', '.join(str(parent) for parent in self.parents)}]>" ) - def __eq__(self, other): + def __eq__(self, other: Any) -> bool: return isinstance(other, HistoryNode) and self.__dict__ == other.__dict__ - def __hash__(self): + def __hash__(self) -> int: return hash((self.entry, self.visited, self.in_history)) def build_history_graph( archive: ArchiveInterface, provenance: ProvenanceInterface, revision: RevisionEntry, ) -> HistoryNode: """Recursively build the history graph from the given revision""" root = HistoryNode( revision, visited=provenance.revision_visited(revision), in_history=provenance.revision_in_history(revision), ) stack = [root] logging.debug( f"Recursively creating history graph for revision {revision.id.hex()}..." ) while stack: current = stack.pop() if not current.visited: current.entry.retrieve_parents(archive) for rev in current.entry.parents: node = current.add_parent( rev, visited=provenance.revision_visited(rev), in_history=provenance.revision_in_history(rev), ) stack.append(node) logging.debug( f"History graph for revision {revision.id.hex()} successfully created!" ) return root class IsochroneNode: def __init__( self, entry: DirectoryEntry, dbdate: Optional[datetime] = None, depth: int = 0, prefix: bytes = b"", - ): + ) -> None: self.entry = entry self.depth = depth # dbdate is the maxdate for this node that comes from the DB self._dbdate: Optional[datetime] = dbdate # maxdate is set by the maxdate computation algorithm self.maxdate: Optional[datetime] = None # known is True if this node is already known in the db; either because # the current directory actually exists in the database, or because all # the content of the current directory is known (subdirectories and files) self.known = self.dbdate is not None self.invalid = False self.path = os.path.join(prefix, self.entry.name) if prefix else self.entry.name self.children: Set[IsochroneNode] = set() @property - def dbdate(self): + def dbdate(self) -> Optional[datetime]: # use a property to make this attribute (mostly) read-only return self._dbdate - def invalidate(self): + def invalidate(self) -> None: self._dbdate = None self.maxdate = None self.known = False self.invalid = True def add_directory( self, child: DirectoryEntry, date: Optional[datetime] = None - ) -> "IsochroneNode": + ) -> IsochroneNode: # we should not be processing this node (ie add subdirectories or files) if it's # actually known by the provenance DB assert self.dbdate is None node = IsochroneNode(child, dbdate=date, depth=self.depth + 1, prefix=self.path) self.children.add(node) return node - def __str__(self): + def __str__(self) -> str: return ( f"<{self.entry}: depth={self.depth}, " f"dbdate={self.dbdate}, maxdate={self.maxdate}, " - f"known={self.known}, invalid={self.invalid}, path={self.path}, " + f"known={self.known}, invalid={self.invalid}, path={self.path!r}, " f"children=[{', '.join(str(child) for child in self.children)}]>" ) - def __eq__(self, other): + def __eq__(self, other: Any) -> bool: return isinstance(other, IsochroneNode) and self.__dict__ == other.__dict__ - def __hash__(self): + def __hash__(self) -> int: # only immutable attributes are considered to compute hash return hash((self.entry, self.depth, self.path)) def build_isochrone_graph( archive: ArchiveInterface, provenance: ProvenanceInterface, revision: RevisionEntry, directory: DirectoryEntry, ) -> IsochroneNode: assert revision.date is not None assert revision.root == directory.id # this function process a revision in 2 steps: # # 1. build the tree structure of IsochroneNode objects (one INode per # directory under the root directory of the revision but not following # known subdirectories), and gather the dates from the DB for already # known objects; for files, just keep all the dates in a global 'fdates' # dict; note that in this step, we will only recurse the directories # that are not already known. # # 2. compute the maxdate for each node of the tree that was not found in the DB. # Build the nodes structure root_date = provenance.directory_get_date_in_isochrone_frontier(directory) root = IsochroneNode(directory, dbdate=root_date) stack = [root] logging.debug( f"Recursively creating isochrone graph for revision {revision.id.hex()}..." ) fdates: Dict[Sha1Git, datetime] = {} # map {file_id: date} while stack: current = stack.pop() if current.dbdate is None or current.dbdate > revision.date: # If current directory has an associated date in the isochrone frontier that # is greater or equal to the current revision's one, it should be ignored as # the revision is being processed out of order. if current.dbdate is not None and current.dbdate > revision.date: logging.debug( f"Invalidating frontier on {current.entry.id.hex()}" f" (date {current.dbdate})" f" when processing revision {revision.id.hex()}" f" (date {revision.date})" ) current.invalidate() # Pre-query all known dates for directories in the current directory # for the provenance object to have them cached and (potentially) improve # performance. current.entry.retrieve_children(archive) ddates = provenance.directory_get_dates_in_isochrone_frontier( current.entry.dirs ) for dir in current.entry.dirs: # Recursively analyse subdirectory nodes node = current.add_directory(dir, date=ddates.get(dir.id, None)) stack.append(node) fdates.update(provenance.content_get_early_dates(current.entry.files)) logging.debug( f"Isochrone graph for revision {revision.id.hex()} successfully created!" ) # Precalculate max known date for each node in the graph (only directory nodes are # pushed to the stack). logging.debug(f"Computing maxdates for revision {revision.id.hex()}...") stack = [root] while stack: current = stack.pop() # Current directory node is known if it already has an assigned date (ie. it was # already seen as an isochrone frontier). if current.known: assert current.maxdate is None current.maxdate = current.dbdate else: if any(x.maxdate is None for x in current.children): # at least one child of current has no maxdate yet # Current node needs to be analysed again after its children. stack.append(current) for child in current.children: if child.maxdate is None: # if child.maxdate is None, it must be processed stack.append(child) else: # all the files and directories under current have a maxdate, # we can infer the maxdate for current directory assert current.maxdate is None # if all content is already known, update current directory info. current.maxdate = max( [UTCMIN] + [ child.maxdate for child in current.children if child.maxdate is not None # unnecessary, but needed for mypy ] + [ fdates.get(file.id, revision.date) for file in current.entry.files ] ) if current.maxdate <= revision.date: current.known = ( # true if all subdirectories are known all(child.known for child in current.children) # true if all files are in fdates, i.e. if all files were known # *before building this isochrone graph node* # Note: the 'all()' is lazy: will stop iterating as soon as # possible and all((file.id in fdates) for file in current.entry.files) ) else: # at least one content is being processed out-of-order, then current # node should be treated as unknown current.maxdate = revision.date current.known = False logging.debug(f"Maxdates for revision {revision.id.hex()} successfully computed!") return root diff --git a/swh/provenance/model.py b/swh/provenance/model.py index cfe6c1f..c1e3a36 100644 --- a/swh/provenance/model.py +++ b/swh/provenance/model.py @@ -1,147 +1,149 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +from __future__ import annotations + from datetime import datetime from typing import Iterable, Iterator, List, Optional from swh.model.hashutil import hash_to_bytes from swh.model.identifiers import origin_identifier from swh.model.model import Sha1Git from .archive import ArchiveInterface class OriginEntry: - def __init__(self, url: str, snapshot: Sha1Git): + def __init__(self, url: str, snapshot: Sha1Git) -> None: self.url = url self.id: Sha1Git = hash_to_bytes(origin_identifier({"url": self.url})) self.snapshot = snapshot self._revisions: Optional[List[RevisionEntry]] = None - def retrieve_revisions(self, archive: ArchiveInterface): + def retrieve_revisions(self, archive: ArchiveInterface) -> None: if self._revisions is None: self._revisions = [ RevisionEntry(rev) for rev in archive.snapshot_get_heads(self.snapshot) ] @property - def revisions(self) -> Iterator["RevisionEntry"]: + def revisions(self) -> Iterator[RevisionEntry]: if self._revisions is None: raise RuntimeError( "Revisions of this node has not yet been retrieved. " "Please call retrieve_revisions() before using this property." ) return (x for x in self._revisions) - def __str__(self): + def __str__(self) -> str: return f"" class RevisionEntry: def __init__( self, id: Sha1Git, date: Optional[datetime] = None, root: Optional[Sha1Git] = None, parents: Optional[Iterable[Sha1Git]] = None, - ): + ) -> None: self.id = id self.date = date assert self.date is None or self.date.tzinfo is not None self.root = root self._parents_ids = parents self._parents_entries: Optional[List[RevisionEntry]] = None - def retrieve_parents(self, archive: ArchiveInterface): + def retrieve_parents(self, archive: ArchiveInterface) -> None: if self._parents_entries is None: if self._parents_ids is None: self._parents_ids = archive.revision_get_parents(self.id) self._parents_entries = [RevisionEntry(id) for id in self._parents_ids] @property - def parents(self) -> Iterator["RevisionEntry"]: + def parents(self) -> Iterator[RevisionEntry]: if self._parents_entries is None: raise RuntimeError( "Parents of this node has not yet been retrieved. " "Please call retrieve_parents() before using this property." ) return (x for x in self._parents_entries) - def __str__(self): + def __str__(self) -> str: return f"" - def __eq__(self, other): + def __eq__(self, other) -> bool: return isinstance(other, RevisionEntry) and self.id == other.id - def __hash__(self): + def __hash__(self) -> int: return hash(self.id) class DirectoryEntry: - def __init__(self, id: Sha1Git, name: bytes = b""): + def __init__(self, id: Sha1Git, name: bytes = b"") -> None: self.id = id self.name = name self._files: Optional[List[FileEntry]] = None self._dirs: Optional[List[DirectoryEntry]] = None - def retrieve_children(self, archive: ArchiveInterface): + def retrieve_children(self, archive: ArchiveInterface) -> None: if self._files is None and self._dirs is None: self._files = [] self._dirs = [] for child in archive.directory_ls(self.id): if child["type"] == "dir": self._dirs.append( DirectoryEntry(child["target"], name=child["name"]) ) elif child["type"] == "file": self._files.append(FileEntry(child["target"], child["name"])) @property - def files(self) -> Iterator["FileEntry"]: + def files(self) -> Iterator[FileEntry]: if self._files is None: raise RuntimeError( "Children of this node has not yet been retrieved. " "Please call retrieve_children() before using this property." ) return (x for x in self._files) @property - def dirs(self) -> Iterator["DirectoryEntry"]: + def dirs(self) -> Iterator[DirectoryEntry]: if self._dirs is None: raise RuntimeError( "Children of this node has not yet been retrieved. " "Please call retrieve_children() before using this property." ) return (x for x in self._dirs) - def __str__(self): - return f"" + def __str__(self) -> str: + return f"" - def __eq__(self, other): + def __eq__(self, other) -> bool: return isinstance(other, DirectoryEntry) and (self.id, self.name) == ( other.id, other.name, ) - def __hash__(self): + def __hash__(self) -> int: return hash((self.id, self.name)) class FileEntry: - def __init__(self, id: Sha1Git, name: bytes): + def __init__(self, id: Sha1Git, name: bytes) -> None: self.id = id self.name = name - def __str__(self): - return f"" + def __str__(self) -> str: + return f"" - def __eq__(self, other): + def __eq__(self, other) -> bool: return isinstance(other, FileEntry) and (self.id, self.name) == ( other.id, other.name, ) - def __hash__(self): + def __hash__(self) -> int: return hash((self.id, self.name)) diff --git a/swh/provenance/postgresql/archive.py b/swh/provenance/postgresql/archive.py index 8737f31..d89f941 100644 --- a/swh/provenance/postgresql/archive.py +++ b/swh/provenance/postgresql/archive.py @@ -1,110 +1,110 @@ from typing import Any, Dict, Iterable, List from methodtools import lru_cache import psycopg2 from swh.model.model import Sha1Git from swh.storage.postgresql.storage import Storage class ArchivePostgreSQL: - def __init__(self, conn: psycopg2.extensions.connection): + def __init__(self, conn: psycopg2.extensions.connection) -> None: self.conn = conn self.storage = Storage(conn, objstorage={"cls": "memory"}) def directory_ls(self, id: Sha1Git) -> Iterable[Dict[str, Any]]: - entries = self.directory_ls_internal(id) + entries = self._directory_ls(id) yield from entries @lru_cache(maxsize=100000) - def directory_ls_internal(self, id: Sha1Git) -> List[Dict[str, Any]]: + def _directory_ls(self, id: Sha1Git) -> List[Dict[str, Any]]: # TODO: add file size filtering with self.conn.cursor() as cursor: cursor.execute( """ WITH dir AS (SELECT id AS dir_id, dir_entries, file_entries, rev_entries FROM directory WHERE id=%s), ls_d AS (SELECT dir_id, UNNEST(dir_entries) AS entry_id FROM dir), ls_f AS (SELECT dir_id, UNNEST(file_entries) AS entry_id FROM dir), ls_r AS (SELECT dir_id, UNNEST(rev_entries) AS entry_id FROM dir) (SELECT 'dir'::directory_entry_type AS type, e.target, e.name, NULL::sha1_git FROM ls_d LEFT JOIN directory_entry_dir e ON ls_d.entry_id=e.id) UNION (WITH known_contents AS (SELECT 'file'::directory_entry_type AS type, e.target, e.name, c.sha1_git FROM ls_f LEFT JOIN directory_entry_file e ON ls_f.entry_id=e.id INNER JOIN content c ON e.target=c.sha1_git) SELECT * FROM known_contents UNION (SELECT 'file'::directory_entry_type AS type, e.target, e.name, c.sha1_git FROM ls_f LEFT JOIN directory_entry_file e ON ls_f.entry_id=e.id LEFT JOIN skipped_content c ON e.target=c.sha1_git WHERE NOT EXISTS ( SELECT 1 FROM known_contents WHERE known_contents.sha1_git=e.target ) ) ) """, (id,), ) return [ {"type": row[0], "target": row[1], "name": row[2]} for row in cursor.fetchall() ] def revision_get_parents(self, id: Sha1Git) -> Iterable[Sha1Git]: with self.conn.cursor() as cursor: cursor.execute( """ SELECT RH.parent_id::bytea FROM revision_history AS RH WHERE RH.id=%s ORDER BY RH.parent_rank """, (id,), ) # There should be at most one row anyway yield from (row[0] for row in cursor.fetchall()) def snapshot_get_heads(self, id: Sha1Git) -> Iterable[Sha1Git]: with self.conn.cursor() as cursor: cursor.execute( """ WITH snaps AS (SELECT object_id FROM snapshot WHERE snapshot.id=%s), heads AS ((SELECT R.id, R.date FROM snaps JOIN snapshot_branches AS BS ON (snaps.object_id=BS.snapshot_id) JOIN snapshot_branch AS B ON (BS.branch_id=B.object_id) JOIN revision AS R ON (B.target=R.id) WHERE B.target_type='revision'::snapshot_target) UNION (SELECT RV.id, RV.date FROM snaps JOIN snapshot_branches AS BS ON (snaps.object_id=BS.snapshot_id) JOIN snapshot_branch AS B ON (BS.branch_id=B.object_id) JOIN release AS RL ON (B.target=RL.id) JOIN revision AS RV ON (RL.target=RV.id) WHERE B.target_type='release'::snapshot_target AND RL.target_type='revision'::object_type) ORDER BY date, id) SELECT id FROM heads """, (id,), ) yield from (row[0] for row in cursor.fetchall()) diff --git a/swh/provenance/provenance.py b/swh/provenance/provenance.py index 9ce884a..13ca58b 100644 --- a/swh/provenance/provenance.py +++ b/swh/provenance/provenance.py @@ -1,264 +1,264 @@ from datetime import datetime import enum from typing import Dict, Generator, Iterable, Optional, Set, Tuple from typing_extensions import Protocol, runtime_checkable from swh.model.model import Sha1Git from .model import DirectoryEntry, FileEntry, OriginEntry, RevisionEntry +class RelationType(enum.Enum): + CNT_EARLY_IN_REV = "content_in_revision" + CNT_IN_DIR = "content_in_directory" + DIR_IN_REV = "directory_in_revision" + REV_IN_ORG = "revision_in_origin" + REV_BEFORE_REV = "revision_before_revision" + + class ProvenanceResult: def __init__( self, content: Sha1Git, revision: Sha1Git, date: datetime, origin: Optional[str], path: bytes, ) -> None: self.content = content self.revision = revision self.date = date self.origin = origin self.path = path +@runtime_checkable +class ProvenanceStorageInterface(Protocol): + raise_on_commit: bool = False + + def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]: + """Retrieve the first occurrence of the blob identified by `id`.""" + ... + + def content_find_all( + self, id: Sha1Git, limit: Optional[int] = None + ) -> Generator[ProvenanceResult, None, None]: + """Retrieve all the occurrences of the blob identified by `id`.""" + ... + + def content_set_date(self, dates: Dict[Sha1Git, datetime]) -> bool: + """Associate dates to blobs identified by sha1 ids, as paired in `dates`. Return + a boolean stating whether the information was successfully stored. + """ + ... + + def content_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]: + """Retrieve the associated date for each blob sha1 in `ids`. If some blob has + no associated date, it is not present in the resulting dictionary. + """ + ... + + def directory_set_date(self, dates: Dict[Sha1Git, datetime]) -> bool: + """Associate dates to directories identified by sha1 ids, as paired in + `dates`. Return a boolean stating whether the information was successfully + stored. + """ + ... + + def directory_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]: + """Retrieve the associated date for each directory sha1 in `ids`. If some + directory has no associated date, it is not present in the resulting dictionary. + """ + ... + + def origin_set_url(self, urls: Dict[Sha1Git, str]) -> bool: + """Associate urls to origins identified by sha1 ids, as paired in `urls`. Return + a boolean stating whether the information was successfully stored. + """ + ... + + def origin_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, str]: + """Retrieve the associated url for each origin sha1 in `ids`. If some origin has + no associated date, it is not present in the resulting dictionary. + """ + ... + + def revision_set_date(self, dates: Dict[Sha1Git, datetime]) -> bool: + """Associate dates to revisions identified by sha1 ids, as paired in `dates`. + Return a boolean stating whether the information was successfully stored. + """ + ... + + def revision_set_origin(self, origins: Dict[Sha1Git, Sha1Git]) -> bool: + """Associate origins to revisions identified by sha1 ids, as paired in + `origins` (revision ids are keys and origin ids, values). Return a boolean + stating whether the information was successfully stored. + """ + ... + + def revision_get( + self, ids: Iterable[Sha1Git] + ) -> Dict[Sha1Git, Tuple[Optional[datetime], Optional[Sha1Git]]]: + """Retrieve the associated date and origin for each revision sha1 in `ids`. If + some revision has no associated date nor origin, it is not present in the + resulting dictionary. + """ + ... + + def relation_add( + self, + relation: RelationType, + data: Iterable[Tuple[Sha1Git, Sha1Git, Optional[bytes]]], + ) -> bool: + """Add entries in the selected `relation`. Each tuple in `data` is of the from + (`src`, `dst`, `path`), where `src` and `dst` are the sha1 ids of the entities + being related, and `path` is optional depending on the selected `relation`. + """ + ... + + def relation_get( + self, relation: RelationType, ids: Iterable[Sha1Git], reverse: bool = False + ) -> Set[Tuple[Sha1Git, Sha1Git, Optional[bytes]]]: + """Retrieve all tuples in the selected `relation` whose source entities are + identified by some sha1 id in `ids`. If `reverse` is set, destination entities + are matched instead. + """ + ... + + @runtime_checkable class ProvenanceInterface(Protocol): - storage: "ProvenanceStorageInterface" + storage: ProvenanceStorageInterface def flush(self) -> None: """Flush internal cache to the underlying `storage`.""" ... def content_add_to_directory( self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes ) -> None: """Associate `blob` with `directory` in the provenance model. `prefix` is the relative path from `directory` to `blob` (excluding `blob`'s name). """ ... def content_add_to_revision( self, revision: RevisionEntry, blob: FileEntry, prefix: bytes ) -> None: """Associate `blob` with `revision` in the provenance model. `prefix` is the absolute path from `revision`'s root directory to `blob` (excluding `blob`'s name). """ ... def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]: """Retrieve the first occurrence of the blob identified by `id`.""" ... def content_find_all( self, id: Sha1Git, limit: Optional[int] = None ) -> Generator[ProvenanceResult, None, None]: """Retrieve all the occurrences of the blob identified by `id`.""" ... def content_get_early_date(self, blob: FileEntry) -> Optional[datetime]: """Retrieve the earliest known date of `blob`.""" ... def content_get_early_dates( self, blobs: Iterable[FileEntry] ) -> Dict[Sha1Git, datetime]: """Retrieve the earliest known date for each blob in `blobs`. If some blob has no associated date, it is not present in the resulting dictionary. """ ... def content_set_early_date(self, blob: FileEntry, date: datetime) -> None: """Associate `date` to `blob` as it's earliest known date.""" ... def directory_add_to_revision( self, revision: RevisionEntry, directory: DirectoryEntry, path: bytes ) -> None: """Associate `directory` with `revision` in the provenance model. `path` is the absolute path from `revision`'s root directory to `directory` (including `directory`'s name). """ ... def directory_get_date_in_isochrone_frontier( self, directory: DirectoryEntry ) -> Optional[datetime]: """Retrieve the earliest known date of `directory` as an isochrone frontier in the provenance model. """ ... def directory_get_dates_in_isochrone_frontier( self, dirs: Iterable[DirectoryEntry] ) -> Dict[Sha1Git, datetime]: """Retrieve the earliest known date for each directory in `dirs` as isochrone frontiers provenance model. If some directory has no associated date, it is not present in the resulting dictionary. """ ... def directory_set_date_in_isochrone_frontier( self, directory: DirectoryEntry, date: datetime ) -> None: """Associate `date` to `directory` as it's earliest known date as an isochrone frontier in the provenance model. """ ... def origin_add(self, origin: OriginEntry) -> None: """Add `origin` to the provenance model.""" ... def revision_add(self, revision: RevisionEntry) -> None: """Add `revision` to the provenance model. This implies storing `revision`'s date in the model, thus `revision.date` must be a valid date. """ ... def revision_add_before_revision( self, head: RevisionEntry, revision: RevisionEntry ) -> None: """Associate `revision` to `head` as an ancestor of the latter.""" ... def revision_add_to_origin( self, origin: OriginEntry, revision: RevisionEntry ) -> None: """Associate `revision` to `origin` as a head revision of the latter (ie. the target of an snapshot for `origin` in the archive).""" ... def revision_get_date(self, revision: RevisionEntry) -> Optional[datetime]: """Retrieve the date associated to `revision`.""" ... def revision_get_preferred_origin( self, revision: RevisionEntry ) -> Optional[Sha1Git]: """Retrieve the preferred origin associated to `revision`.""" ... def revision_in_history(self, revision: RevisionEntry) -> bool: """Check if `revision` is known to be an ancestor of some head revision in the provenance model. """ ... def revision_set_preferred_origin( self, origin: OriginEntry, revision: RevisionEntry ) -> None: """Associate `origin` as the preferred origin for `revision`.""" ... def revision_visited(self, revision: RevisionEntry) -> bool: """Check if `revision` is known to be a head revision for some origin in the provenance model. """ ... - - -class RelationType(enum.Enum): - CNT_EARLY_IN_REV = "content_in_revision" - CNT_IN_DIR = "content_in_directory" - DIR_IN_REV = "directory_in_revision" - REV_IN_ORG = "revision_in_origin" - REV_BEFORE_REV = "revision_before_revision" - - -@runtime_checkable -class ProvenanceStorageInterface(Protocol): - raise_on_commit: bool = False - - def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]: - """Retrieve the first occurrence of the blob identified by `id`.""" - ... - - def content_find_all( - self, id: Sha1Git, limit: Optional[int] = None - ) -> Generator[ProvenanceResult, None, None]: - """Retrieve all the occurrences of the blob identified by `id`.""" - ... - - def content_set_date(self, dates: Dict[Sha1Git, datetime]) -> bool: - """Associate dates to blobs identified by sha1 ids, as paired in `dates`. Return - a boolean stating whether the information was successfully stored. - """ - ... - - def content_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]: - """Retrieve the associated date for each blob sha1 in `ids`. If some blob has - no associated date, it is not present in the resulting dictionary. - """ - ... - - def directory_set_date(self, dates: Dict[Sha1Git, datetime]) -> bool: - """Associate dates to directories identified by sha1 ids, as paired in - `dates`. Return a boolean stating whether the information was successfully - stored. - """ - ... - - def directory_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]: - """Retrieve the associated date for each directory sha1 in `ids`. If some - directory has no associated date, it is not present in the resulting dictionary. - """ - ... - - def origin_set_url(self, urls: Dict[Sha1Git, str]) -> bool: - """Associate urls to origins identified by sha1 ids, as paired in `urls`. Return - a boolean stating whether the information was successfully stored. - """ - ... - - def origin_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, str]: - """Retrieve the associated url for each origin sha1 in `ids`. If some origin has - no associated date, it is not present in the resulting dictionary. - """ - ... - - def revision_set_date(self, dates: Dict[Sha1Git, datetime]) -> bool: - """Associate dates to revisions identified by sha1 ids, as paired in `dates`. - Return a boolean stating whether the information was successfully stored. - """ - ... - - def revision_set_origin(self, origins: Dict[Sha1Git, Sha1Git]) -> bool: - """Associate origins to revisions identified by sha1 ids, as paired in - `origins` (revision ids are keys and origin ids, values). Return a boolean - stating whether the information was successfully stored. - """ - ... - - def revision_get( - self, ids: Iterable[Sha1Git] - ) -> Dict[Sha1Git, Tuple[Optional[datetime], Optional[Sha1Git]]]: - """Retrieve the associated date and origin for each revision sha1 in `ids`. If - some revision has no associated date nor origin, it is not present in the - resulting dictionary. - """ - ... - - def relation_add( - self, - relation: RelationType, - data: Iterable[Tuple[Sha1Git, Sha1Git, Optional[bytes]]], - ) -> bool: - """Add entries in the selected `relation`. Each tuple in `data` is of the from - (`src`, `dst`, `path`), where `src` and `dst` are the sha1 ids of the entities - being related, and `path` is optional depending on the selected `relation`. - """ - ... - - def relation_get( - self, relation: RelationType, ids: Iterable[Sha1Git], reverse: bool = False - ) -> Set[Tuple[Sha1Git, Sha1Git, Optional[bytes]]]: - """Retrieve all tuples in the selected `relation` whose source entities are - identified by some sha1 id in `ids`. If `reverse` is set, destination entities - are matched instead. - """ - ... diff --git a/swh/provenance/revision.py b/swh/provenance/revision.py index 5ea60df..30c40bc 100644 --- a/swh/provenance/revision.py +++ b/swh/provenance/revision.py @@ -1,241 +1,242 @@ from datetime import datetime, timezone -from itertools import islice import logging import os import time from typing import Generator, Iterable, Iterator, List, Optional, Tuple from swh.model.model import Sha1Git from .archive import ArchiveInterface from .graph import IsochroneNode, build_isochrone_graph from .model import DirectoryEntry, RevisionEntry from .provenance import ProvenanceInterface class CSVRevisionIterator: """Iterator over revisions typically present in the given CSV file. The input is an iterator that produces 3 elements per row: (id, date, root) where: - id: is the id (sha1_git) of the revision - date: is the author date - root: sha1 of the directory """ def __init__( self, revisions: Iterable[Tuple[Sha1Git, datetime, Sha1Git]], limit: Optional[int] = None, ) -> None: self.revisions: Iterator[Tuple[Sha1Git, datetime, Sha1Git]] if limit is not None: + from itertools import islice + self.revisions = islice(revisions, limit) else: self.revisions = iter(revisions) def __iter__(self) -> Generator[RevisionEntry, None, None]: for id, date, root in self.revisions: if date.tzinfo is None: date = date.replace(tzinfo=timezone.utc) yield RevisionEntry(id, date=date, root=root) def revision_add( provenance: ProvenanceInterface, archive: ArchiveInterface, revisions: List[RevisionEntry], trackall: bool = True, lower: bool = True, mindepth: int = 1, commit: bool = True, ) -> None: start = time.time() for revision in revisions: assert revision.date is not None assert revision.root is not None # Processed content starting from the revision's root directory. date = provenance.revision_get_date(revision) if date is None or revision.date < date: logging.debug( f"Processing revisions {revision.id.hex()}" f" (known date {date} / revision date {revision.date})..." ) graph = build_isochrone_graph( archive, provenance, revision, DirectoryEntry(revision.root), ) # TODO: add file size filtering revision_process_content( archive, provenance, revision, graph, trackall=trackall, lower=lower, mindepth=mindepth, ) done = time.time() if commit: provenance.flush() stop = time.time() logging.debug( f"Revisions {';'.join([revision.id.hex() for revision in revisions])} " f" were processed in {stop - start} secs (commit took {stop - done} secs)!" ) def revision_process_content( archive: ArchiveInterface, provenance: ProvenanceInterface, revision: RevisionEntry, graph: IsochroneNode, trackall: bool = True, lower: bool = True, mindepth: int = 1, ) -> None: assert revision.date is not None provenance.revision_add(revision) stack = [graph] while stack: current = stack.pop() if current.dbdate is not None: assert current.dbdate <= revision.date if trackall: # Current directory is an outer isochrone frontier for a previously # processed revision. It should be reused as is. provenance.directory_add_to_revision( revision, current.entry, current.path ) else: assert current.maxdate is not None # Current directory is not an outer isochrone frontier for any previous # revision. It might be eligible for this one. if is_new_frontier( current, revision=revision, trackall=trackall, lower=lower, mindepth=mindepth, ): # Outer frontier should be moved to current position in the isochrone # graph. This is the first time this directory is found in the isochrone # frontier. provenance.directory_set_date_in_isochrone_frontier( current.entry, current.maxdate ) if trackall: provenance.directory_add_to_revision( revision, current.entry, current.path ) flatten_directory(archive, provenance, current.entry) else: # If current node is an invalidated frontier, update its date for future # revisions to get the proper value. if current.invalid: provenance.directory_set_date_in_isochrone_frontier( current.entry, current.maxdate ) # No point moving the frontier here. Either there are no files or they # are being seen for the first time here. Add all blobs to current # revision updating date if necessary, and recursively analyse # subdirectories as candidates to the outer frontier. for blob in current.entry.files: date = provenance.content_get_early_date(blob) if date is None or revision.date < date: provenance.content_set_early_date(blob, revision.date) provenance.content_add_to_revision(revision, blob, current.path) for child in current.children: stack.append(child) def flatten_directory( archive: ArchiveInterface, provenance: ProvenanceInterface, directory: DirectoryEntry, ) -> None: """Recursively retrieve all the files of 'directory' and insert them in the 'provenance' database in the 'content_to_directory' table. """ stack = [(directory, b"")] while stack: current, prefix = stack.pop() current.retrieve_children(archive) for f_child in current.files: # Add content to the directory with the computed prefix. provenance.content_add_to_directory(directory, f_child, prefix) for d_child in current.dirs: # Recursively walk the child directory. stack.append((d_child, os.path.join(prefix, d_child.name))) def is_new_frontier( node: IsochroneNode, revision: RevisionEntry, trackall: bool = True, lower: bool = True, mindepth: int = 1, ) -> bool: assert node.maxdate is not None # for mypy assert revision.date is not None # idem if trackall: # The only real condition for a directory to be a frontier is that its # content is already known and its maxdate is less (or equal) than # current revision's date. Checking mindepth is meant to skip root # directories (or any arbitrary depth) to improve the result. The # option lower tries to maximize the reusage rate of previously defined # frontiers by keeping them low in the directory tree. return ( node.known and node.maxdate <= revision.date # all content is earlier than revision and node.depth >= mindepth # current node is deeper than the min allowed depth and (has_blobs(node) if lower else True) # there is at least one blob in it ) else: # If we are only tracking first occurrences, we want to ensure that all first # occurrences end up in the content_early_in_rev relation. Thus, we force for # every blob outside a frontier to have an extrictly earlier date. return ( node.maxdate < revision.date # all content is earlier than revision and node.depth >= mindepth # deeper than the min allowed depth and (has_blobs(node) if lower else True) # there is at least one blob ) def has_blobs(node: IsochroneNode) -> bool: # We may want to look for files in different ways to decide whether to define a # frontier or not: # 1. Only files in current node: return any(node.entry.files) # 2. Files anywhere in the isochrone graph # stack = [node] # while stack: # current = stack.pop() # if any( # map(lambda child: isinstance(child.entry, FileEntry), current.children)): # return True # else: # # All children are directory entries. # stack.extend(current.children) # return False # 3. Files in the intermediate directories between current node and any previously # defined frontier: # TODO: complete this case! # return any( # map(lambda child: isinstance(child.entry, FileEntry), node.children) # ) or all( # map( # lambda child: ( # not (isinstance(child.entry, DirectoryEntry) and child.date is None) # ) # or has_blobs(child), # node.children, # ) # )