diff --git a/swh/provenance/archive.py b/swh/provenance/archive.py index d391411..2196be6 100644 --- a/swh/provenance/archive.py +++ b/swh/provenance/archive.py @@ -1,54 +1,54 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Any, Dict, Iterable from typing_extensions import Protocol, runtime_checkable from swh.model.model import Sha1Git from swh.storage.interface import StorageInterface @runtime_checkable class ArchiveInterface(Protocol): storage: StorageInterface - def directory_ls(self, id: Sha1Git) -> Iterable[Dict[str, Any]]: + def directory_ls(self, id: Sha1Git, minsize: int = 0) -> Iterable[Dict[str, Any]]: """List entries for one directory. Args: id: sha1 id of the directory to list entries from. Yields: dictionary of entries in such directory containing only the keys "name", "target" and "type". """ ... def revision_get_parents(self, id: Sha1Git) -> Iterable[Sha1Git]: """List parents of one revision. Args: revisions: sha1 id of the revision to list parents from. Yields: sha1 ids for the parents of such revision. """ ... def snapshot_get_heads(self, id: Sha1Git) -> Iterable[Sha1Git]: """List all revisions targeted by one snapshot. Args: id: sha1 id of the snapshot. Yields: sha1 ids of revisions that are a target of such snapshot. Revisions are guaranteed to be retrieved in chronological order """ ... diff --git a/swh/provenance/cli.py b/swh/provenance/cli.py index 96359e3..7a49913 100644 --- a/swh/provenance/cli.py +++ b/swh/provenance/cli.py @@ -1,238 +1,240 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information # WARNING: do not import unnecessary things here to keep cli startup time under # control from datetime import datetime, timezone import os from typing import Any, Dict, Generator, Optional, Tuple import click import iso8601 import yaml from swh.core import config from swh.core.cli import CONTEXT_SETTINGS from swh.core.cli import swh as swh_cli_group from swh.model.hashutil import hash_to_bytes, hash_to_hex from swh.model.model import Sha1Git # All generic config code should reside in swh.core.config CONFIG_ENVVAR = "SWH_CONFIG_FILENAME" DEFAULT_PATH = os.environ.get(CONFIG_ENVVAR, None) DEFAULT_CONFIG: Dict[str, Any] = { "provenance": { "archive": { # Storage API based Archive object # "cls": "api", # "storage": { # "cls": "remote", # "url": "http://uffizi.internal.softwareheritage.org:5002", # } # Direct access Archive object "cls": "direct", "db": { "host": "db.internal.softwareheritage.org", "dbname": "softwareheritage", "user": "guest", }, }, "storage": { # Local PostgreSQL Storage "cls": "postgresql", "db": { "host": "localhost", "user": "postgres", "password": "postgres", "dbname": "provenance", }, # Local MongoDB Storage # "cls": "mongodb", # "db": { # "dbname": "provenance", # }, }, } } CONFIG_FILE_HELP = f""" \b Configuration can be loaded from a yaml file given either as --config-file option or the {CONFIG_ENVVAR} environment variable. If no configuration file is specified, use the following default configuration:: \b {yaml.dump(DEFAULT_CONFIG)}""" PROVENANCE_HELP = f"""Software Heritage provenance index database tools {CONFIG_FILE_HELP} """ @swh_cli_group.group( name="provenance", context_settings=CONTEXT_SETTINGS, help=PROVENANCE_HELP ) @click.option( "-C", "--config-file", default=None, type=click.Path(exists=True, dir_okay=False, path_type=str), help="""YAML configuration file.""", ) @click.option( "-P", "--profile", default=None, type=click.Path(exists=False, dir_okay=False, path_type=str), help="""Enable profiling to specified file.""", ) @click.pass_context def cli(ctx: click.core.Context, config_file: Optional[str], profile: str) -> None: if ( config_file is None and DEFAULT_PATH is not None and config.config_exists(DEFAULT_PATH) ): config_file = DEFAULT_PATH if config_file is None: conf = DEFAULT_CONFIG else: # read_raw_config do not fail on ENOENT if not os.path.exists(config_file): raise FileNotFoundError(config_file) conf = yaml.safe_load(open(config_file, "rb")) ctx.ensure_object(dict) ctx.obj["config"] = conf if profile: import atexit import cProfile print("Profiling...") pr = cProfile.Profile() pr.enable() def exit() -> None: pr.disable() pr.dump_stats(profile) atexit.register(exit) @cli.command(name="iter-revisions") @click.argument("filename") @click.option("-a", "--track-all", default=True, type=bool) @click.option("-l", "--limit", type=int) @click.option("-m", "--min-depth", default=1, type=int) @click.option("-r", "--reuse", default=True, type=bool) +@click.option("-s", "--min-size", default=0, type=int) @click.pass_context def iter_revisions( ctx: click.core.Context, filename: str, track_all: bool, limit: Optional[int], min_depth: int, reuse: bool, + min_size: int, ) -> None: - # TODO: add file size filtering """Process a provided list of revisions.""" from . import get_archive, get_provenance from .revision import CSVRevisionIterator, revision_add archive = get_archive(**ctx.obj["config"]["provenance"]["archive"]) revisions_provider = generate_revision_tuples(filename) revisions = CSVRevisionIterator(revisions_provider, limit=limit) with get_provenance(**ctx.obj["config"]["provenance"]["storage"]) as provenance: for revision in revisions: revision_add( provenance, archive, [revision], trackall=track_all, lower=reuse, mindepth=min_depth, + minsize=min_size, ) def generate_revision_tuples( filename: str, ) -> Generator[Tuple[Sha1Git, datetime, Sha1Git], None, None]: for line in open(filename, "r"): if line.strip(): revision, date, root = line.strip().split(",") yield ( hash_to_bytes(revision), iso8601.parse_date(date, default_timezone=timezone.utc), hash_to_bytes(root), ) @cli.command(name="iter-origins") @click.argument("filename") @click.option("-l", "--limit", type=int) @click.pass_context def iter_origins(ctx: click.core.Context, filename: str, limit: Optional[int]) -> None: """Process a provided list of origins.""" from . import get_archive, get_provenance from .origin import CSVOriginIterator, origin_add archive = get_archive(**ctx.obj["config"]["provenance"]["archive"]) origins_provider = generate_origin_tuples(filename) origins = CSVOriginIterator(origins_provider, limit=limit) with get_provenance(**ctx.obj["config"]["provenance"]["storage"]) as provenance: for origin in origins: origin_add(provenance, archive, [origin]) def generate_origin_tuples(filename: str) -> Generator[Tuple[str, bytes], None, None]: for line in open(filename, "r"): if line.strip(): url, snapshot = line.strip().split(",") yield (url, hash_to_bytes(snapshot)) @cli.command(name="find-first") @click.argument("swhid") @click.pass_context def find_first(ctx: click.core.Context, swhid: str) -> None: """Find first occurrence of the requested blob.""" from . import get_provenance with get_provenance(**ctx.obj["config"]["provenance"]["storage"]) as provenance: occur = provenance.content_find_first(hash_to_bytes(swhid)) if occur is not None: print( f"swh:1:cnt:{hash_to_hex(occur.content)}, " f"swh:1:rev:{hash_to_hex(occur.revision)}, " f"{occur.date}, " f"{occur.origin}, " f"{os.fsdecode(occur.path)}" ) else: print(f"Cannot find a content with the id {swhid}") @cli.command(name="find-all") @click.argument("swhid") @click.option("-l", "--limit", type=int) @click.pass_context def find_all(ctx: click.core.Context, swhid: str, limit: Optional[int]) -> None: """Find all occurrences of the requested blob.""" from . import get_provenance with get_provenance(**ctx.obj["config"]["provenance"]["storage"]) as provenance: for occur in provenance.content_find_all(hash_to_bytes(swhid), limit=limit): print( f"swh:1:cnt:{hash_to_hex(occur.content)}, " f"swh:1:rev:{hash_to_hex(occur.revision)}, " f"{occur.date}, " f"{occur.origin}, " f"{os.fsdecode(occur.path)}" ) diff --git a/swh/provenance/graph.py b/swh/provenance/graph.py index df071c5..d9d8919 100644 --- a/swh/provenance/graph.py +++ b/swh/provenance/graph.py @@ -1,274 +1,275 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from __future__ import annotations from datetime import datetime, timezone import os from typing import Any, Dict, Optional, Set from swh.core.statsd import statsd from swh.model.hashutil import hash_to_hex from swh.model.model import Sha1Git from .archive import ArchiveInterface from .interface import ProvenanceInterface from .model import DirectoryEntry, RevisionEntry GRAPH_DURATION_METRIC = "swh_provenance_graph_duration_seconds" GRAPH_OPERATIONS_METRIC = "swh_provenance_graph_operations_total" UTCMIN = datetime.min.replace(tzinfo=timezone.utc) class HistoryNode: def __init__( self, entry: RevisionEntry, is_head: bool = False, in_history: bool = False ) -> None: self.entry = entry # A revision is `is_head` if it is directly pointed by an origin (ie. a head # revision for some snapshot) self.is_head = is_head # A revision is `in_history` if it appears in the history graph of an already # processed revision in the provenance database self.in_history = in_history # XXX: the current simplified version of the origin-revision layer algorithm # does not use this previous two flags at all. They are kept for now but might # be removed in the future (hence, RevisionEntry might be used instead of # HistoryNode). def __str__(self) -> str: return f"<{self.entry}: is_head={self.is_head}, in_history={self.in_history}>" def as_dict(self) -> Dict[str, Any]: return { "rev": hash_to_hex(self.entry.id), "is_head": self.is_head, "in_history": self.in_history, } class HistoryGraph: @statsd.timed(metric=GRAPH_DURATION_METRIC, tags={"method": "build_history_graph"}) def __init__( self, archive: ArchiveInterface, provenance: ProvenanceInterface, revision: RevisionEntry, ) -> None: self._head = HistoryNode( revision, is_head=provenance.revision_visited(revision), in_history=provenance.revision_in_history(revision), ) self._graph: Dict[HistoryNode, Set[HistoryNode]] = {} stack = [self._head] while stack: current = stack.pop() if current not in self._graph: self._graph[current] = set() current.entry.retrieve_parents(archive) for parent in current.entry.parents: node = HistoryNode( parent, is_head=provenance.revision_visited(parent), in_history=provenance.revision_in_history(parent), ) self._graph[current].add(node) stack.append(node) @property def head(self) -> HistoryNode: return self._head @property def parents(self) -> Dict[HistoryNode, Set[HistoryNode]]: return self._graph def __str__(self) -> str: return f" Dict[str, Any]: return { "head": self.head.as_dict(), "graph": { hash_to_hex(node.entry.id): sorted( [parent.as_dict() for parent in parents], key=lambda d: d["rev"], ) for node, parents in self._graph.items() }, } class IsochroneNode: def __init__( self, entry: DirectoryEntry, dbdate: Optional[datetime] = None, depth: int = 0, prefix: bytes = b"", ) -> None: self.entry = entry self.depth = depth # dbdate is the maxdate for this node that comes from the DB self._dbdate: Optional[datetime] = dbdate # maxdate is set by the maxdate computation algorithm self.maxdate: Optional[datetime] = None # known is True if this node is already known in the db; either because # the current directory actually exists in the database, or because all # the content of the current directory is known (subdirectories and files) self.known = self.dbdate is not None self.invalid = False self.path = os.path.join(prefix, self.entry.name) if prefix else self.entry.name self.children: Set[IsochroneNode] = set() @property def dbdate(self) -> Optional[datetime]: # use a property to make this attribute (mostly) read-only return self._dbdate def invalidate(self) -> None: statsd.increment( metric=GRAPH_OPERATIONS_METRIC, tags={"method": "invalidate_frontier"} ) self._dbdate = None self.maxdate = None self.known = False self.invalid = True def add_directory( self, child: DirectoryEntry, date: Optional[datetime] = None ) -> IsochroneNode: # we should not be processing this node (ie add subdirectories or files) if it's # actually known by the provenance DB assert self.dbdate is None node = IsochroneNode(child, dbdate=date, depth=self.depth + 1, prefix=self.path) self.children.add(node) return node def __str__(self) -> str: return ( f"<{self.entry}: depth={self.depth}, " f"dbdate={self.dbdate}, maxdate={self.maxdate}, " f"known={self.known}, invalid={self.invalid}, path={self.path!r}, " f"children=[{', '.join(str(child) for child in self.children)}]>" ) def __eq__(self, other: Any) -> bool: return isinstance(other, IsochroneNode) and self.__dict__ == other.__dict__ def __hash__(self) -> int: # only immutable attributes are considered to compute hash return hash((self.entry, self.depth, self.path)) @statsd.timed(metric=GRAPH_DURATION_METRIC, tags={"method": "build_isochrone_graph"}) def build_isochrone_graph( archive: ArchiveInterface, provenance: ProvenanceInterface, revision: RevisionEntry, directory: DirectoryEntry, + minsize: int = 0, ) -> IsochroneNode: assert revision.date is not None assert revision.root == directory.id # this function process a revision in 2 steps: # # 1. build the tree structure of IsochroneNode objects (one INode per # directory under the root directory of the revision but not following # known subdirectories), and gather the dates from the DB for already # known objects; for files, just keep all the dates in a global 'fdates' # dict; note that in this step, we will only recurse the directories # that are not already known. # # 2. compute the maxdate for each node of the tree that was not found in the DB. # Build the nodes structure root_date = provenance.directory_get_date_in_isochrone_frontier(directory) root = IsochroneNode(directory, dbdate=root_date) stack = [root] fdates: Dict[Sha1Git, datetime] = {} # map {file_id: date} while stack: current = stack.pop() if current.dbdate is None or current.dbdate > revision.date: # If current directory has an associated date in the isochrone frontier that # is greater or equal to the current revision's one, it should be ignored as # the revision is being processed out of order. if current.dbdate is not None and current.dbdate > revision.date: current.invalidate() # Pre-query all known dates for directories in the current directory # for the provenance object to have them cached and (potentially) improve # performance. - current.entry.retrieve_children(archive) + current.entry.retrieve_children(archive, minsize=minsize) ddates = provenance.directory_get_dates_in_isochrone_frontier( current.entry.dirs ) for dir in current.entry.dirs: # Recursively analyse subdirectory nodes node = current.add_directory(dir, date=ddates.get(dir.id, None)) stack.append(node) fdates.update(provenance.content_get_early_dates(current.entry.files)) # Precalculate max known date for each node in the graph (only directory nodes are # pushed to the stack). stack = [root] while stack: current = stack.pop() # Current directory node is known if it already has an assigned date (ie. it was # already seen as an isochrone frontier). if current.known: assert current.maxdate is None current.maxdate = current.dbdate else: if any(x.maxdate is None for x in current.children): # at least one child of current has no maxdate yet # Current node needs to be analysed again after its children. stack.append(current) for child in current.children: if child.maxdate is None: # if child.maxdate is None, it must be processed stack.append(child) else: # all the files and directories under current have a maxdate, # we can infer the maxdate for current directory assert current.maxdate is None # if all content is already known, update current directory info. current.maxdate = max( [UTCMIN] + [ child.maxdate for child in current.children if child.maxdate is not None # unnecessary, but needed for mypy ] + [ fdates.get(file.id, revision.date) for file in current.entry.files ] ) if current.maxdate <= revision.date: current.known = ( # true if all subdirectories are known all(child.known for child in current.children) # true if all files are in fdates, i.e. if all files were known # *before building this isochrone graph node* # Note: the 'all()' is lazy: will stop iterating as soon as # possible and all((file.id in fdates) for file in current.entry.files) ) else: # at least one content is being processed out-of-order, then current # node should be treated as unknown current.maxdate = revision.date current.known = False return root diff --git a/swh/provenance/model.py b/swh/provenance/model.py index e9d4c7a..13dd696 100644 --- a/swh/provenance/model.py +++ b/swh/provenance/model.py @@ -1,147 +1,147 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from __future__ import annotations from datetime import datetime from typing import Iterable, Iterator, List, Optional from swh.model.model import Origin, Sha1Git from .archive import ArchiveInterface class OriginEntry: def __init__(self, url: str, snapshot: Sha1Git) -> None: self.url = url self.id = Origin(url=self.url).id self.snapshot = snapshot self._revisions: Optional[List[RevisionEntry]] = None def retrieve_revisions(self, archive: ArchiveInterface) -> None: if self._revisions is None: self._revisions = [ RevisionEntry(rev) for rev in archive.snapshot_get_heads(self.snapshot) ] @property def revisions(self) -> Iterator[RevisionEntry]: if self._revisions is None: raise RuntimeError( "Revisions of this node has not yet been retrieved. " "Please call retrieve_revisions() before using this property." ) return (x for x in self._revisions) def __str__(self) -> str: return f"" class RevisionEntry: def __init__( self, id: Sha1Git, date: Optional[datetime] = None, root: Optional[Sha1Git] = None, parents: Optional[Iterable[Sha1Git]] = None, ) -> None: self.id = id self.date = date assert self.date is None or self.date.tzinfo is not None self.root = root self._parents_ids = parents self._parents_entries: Optional[List[RevisionEntry]] = None def retrieve_parents(self, archive: ArchiveInterface) -> None: if self._parents_entries is None: if self._parents_ids is None: self._parents_ids = archive.revision_get_parents(self.id) self._parents_entries = [RevisionEntry(id) for id in self._parents_ids] @property def parents(self) -> Iterator[RevisionEntry]: if self._parents_entries is None: raise RuntimeError( "Parents of this node has not yet been retrieved. " "Please call retrieve_parents() before using this property." ) return (x for x in self._parents_entries) def __str__(self) -> str: return f"" def __eq__(self, other) -> bool: return isinstance(other, RevisionEntry) and self.id == other.id def __hash__(self) -> int: return hash(self.id) class DirectoryEntry: def __init__(self, id: Sha1Git, name: bytes = b"") -> None: self.id = id self.name = name self._files: Optional[List[FileEntry]] = None self._dirs: Optional[List[DirectoryEntry]] = None - def retrieve_children(self, archive: ArchiveInterface) -> None: + def retrieve_children(self, archive: ArchiveInterface, minsize: int = 0) -> None: if self._files is None and self._dirs is None: self._files = [] self._dirs = [] - for child in archive.directory_ls(self.id): + for child in archive.directory_ls(self.id, minsize=minsize): if child["type"] == "dir": self._dirs.append( DirectoryEntry(child["target"], name=child["name"]) ) elif child["type"] == "file": self._files.append(FileEntry(child["target"], child["name"])) @property def files(self) -> Iterator[FileEntry]: if self._files is None: raise RuntimeError( "Children of this node has not yet been retrieved. " "Please call retrieve_children() before using this property." ) return (x for x in self._files) @property def dirs(self) -> Iterator[DirectoryEntry]: if self._dirs is None: raise RuntimeError( "Children of this node has not yet been retrieved. " "Please call retrieve_children() before using this property." ) return (x for x in self._dirs) def __str__(self) -> str: return f"" def __eq__(self, other) -> bool: return isinstance(other, DirectoryEntry) and (self.id, self.name) == ( other.id, other.name, ) def __hash__(self) -> int: return hash((self.id, self.name)) class FileEntry: def __init__(self, id: Sha1Git, name: bytes) -> None: self.id = id self.name = name def __str__(self) -> str: return f"" def __eq__(self, other) -> bool: return isinstance(other, FileEntry) and (self.id, self.name) == ( other.id, other.name, ) def __hash__(self) -> int: return hash((self.id, self.name)) diff --git a/swh/provenance/postgresql/archive.py b/swh/provenance/postgresql/archive.py index 26ed8b4..f84fe8c 100644 --- a/swh/provenance/postgresql/archive.py +++ b/swh/provenance/postgresql/archive.py @@ -1,124 +1,125 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Any, Dict, Iterable, List from methodtools import lru_cache import psycopg2.extensions from swh.core.statsd import statsd from swh.model.model import Sha1Git from swh.storage import get_storage ARCHIVE_DURATION_METRIC = "swh_provenance_archive_direct_duration_seconds" class ArchivePostgreSQL: def __init__(self, conn: psycopg2.extensions.connection) -> None: self.storage = get_storage( "postgresql", db=conn.dsn, objstorage={"cls": "memory"} ) self.conn = conn - def directory_ls(self, id: Sha1Git) -> Iterable[Dict[str, Any]]: - entries = self._directory_ls(id) + def directory_ls(self, id: Sha1Git, minsize: int = 0) -> Iterable[Dict[str, Any]]: + entries = self._directory_ls(id, minsize=minsize) yield from entries @lru_cache(maxsize=100000) @statsd.timed(metric=ARCHIVE_DURATION_METRIC, tags={"method": "directory_ls"}) - def _directory_ls(self, id: Sha1Git) -> List[Dict[str, Any]]: - # TODO: add file size filtering + def _directory_ls(self, id: Sha1Git, minsize: int = 0) -> List[Dict[str, Any]]: with self.conn.cursor() as cursor: cursor.execute( """ WITH dir AS (SELECT id AS dir_id, dir_entries, file_entries, rev_entries FROM directory WHERE id=%s), ls_d AS (SELECT dir_id, UNNEST(dir_entries) AS entry_id FROM dir), ls_f AS (SELECT dir_id, UNNEST(file_entries) AS entry_id FROM dir), ls_r AS (SELECT dir_id, UNNEST(rev_entries) AS entry_id FROM dir) (SELECT 'dir'::directory_entry_type AS type, e.target, e.name, NULL::sha1_git FROM ls_d LEFT JOIN directory_entry_dir e ON ls_d.entry_id=e.id) UNION (WITH known_contents AS (SELECT 'file'::directory_entry_type AS type, e.target, e.name, c.sha1_git FROM ls_f LEFT JOIN directory_entry_file e ON ls_f.entry_id=e.id - INNER JOIN content c ON e.target=c.sha1_git) + INNER JOIN content c ON e.target=c.sha1_git + WHERE c.length >= %s + ) SELECT * FROM known_contents UNION (SELECT 'file'::directory_entry_type AS type, e.target, e.name, c.sha1_git FROM ls_f LEFT JOIN directory_entry_file e ON ls_f.entry_id=e.id LEFT JOIN skipped_content c ON e.target=c.sha1_git WHERE NOT EXISTS ( SELECT 1 FROM known_contents WHERE known_contents.sha1_git=e.target ) + AND c.length >= %s ) ) """, - (id,), + (id, minsize, minsize), ) return [ {"type": row[0], "target": row[1], "name": row[2]} for row in cursor ] @statsd.timed( metric=ARCHIVE_DURATION_METRIC, tags={"method": "revision_get_parents"} ) def revision_get_parents(self, id: Sha1Git) -> Iterable[Sha1Git]: with self.conn.cursor() as cursor: cursor.execute( """ SELECT RH.parent_id::bytea FROM revision_history AS RH WHERE RH.id=%s ORDER BY RH.parent_rank """, (id,), ) - # There should be at most one row anyway yield from (row[0] for row in cursor) @statsd.timed(metric=ARCHIVE_DURATION_METRIC, tags={"method": "snapshot_get_heads"}) def snapshot_get_heads(self, id: Sha1Git) -> Iterable[Sha1Git]: with self.conn.cursor() as cursor: cursor.execute( """ WITH snaps AS (SELECT object_id FROM snapshot WHERE snapshot.id=%s), heads AS ((SELECT R.id, R.date FROM snaps JOIN snapshot_branches AS BS ON (snaps.object_id=BS.snapshot_id) JOIN snapshot_branch AS B ON (BS.branch_id=B.object_id) JOIN revision AS R ON (B.target=R.id) WHERE B.target_type='revision'::snapshot_target) UNION (SELECT RV.id, RV.date FROM snaps JOIN snapshot_branches AS BS ON (snaps.object_id=BS.snapshot_id) JOIN snapshot_branch AS B ON (BS.branch_id=B.object_id) JOIN release AS RL ON (B.target=RL.id) JOIN revision AS RV ON (RL.target=RV.id) WHERE B.target_type='release'::snapshot_target AND RL.target_type='revision'::object_type) ORDER BY date, id) SELECT id FROM heads """, (id,), ) yield from (row[0] for row in cursor) diff --git a/swh/provenance/revision.py b/swh/provenance/revision.py index 05dfe5f..3f7527f 100644 --- a/swh/provenance/revision.py +++ b/swh/provenance/revision.py @@ -1,240 +1,246 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime, timezone import os from typing import Generator, Iterable, Iterator, List, Optional, Tuple from swh.core.statsd import statsd from swh.model.model import Sha1Git from .archive import ArchiveInterface from .graph import IsochroneNode, build_isochrone_graph from .interface import ProvenanceInterface from .model import DirectoryEntry, RevisionEntry REVISION_DURATION_METRIC = "swh_provenance_revision_content_layer_duration_seconds" class CSVRevisionIterator: """Iterator over revisions typically present in the given CSV file. The input is an iterator that produces 3 elements per row: (id, date, root) where: - id: is the id (sha1_git) of the revision - date: is the author date - root: sha1 of the directory """ def __init__( self, revisions: Iterable[Tuple[Sha1Git, datetime, Sha1Git]], limit: Optional[int] = None, ) -> None: self.revisions: Iterator[Tuple[Sha1Git, datetime, Sha1Git]] if limit is not None: from itertools import islice self.revisions = islice(revisions, limit) else: self.revisions = iter(revisions) def __iter__(self) -> Generator[RevisionEntry, None, None]: for id, date, root in self.revisions: if date.tzinfo is None: date = date.replace(tzinfo=timezone.utc) yield RevisionEntry(id, date=date, root=root) @statsd.timed(metric=REVISION_DURATION_METRIC, tags={"method": "main"}) def revision_add( provenance: ProvenanceInterface, archive: ArchiveInterface, revisions: List[RevisionEntry], trackall: bool = True, lower: bool = True, mindepth: int = 1, + minsize: int = 0, commit: bool = True, ) -> None: for revision in revisions: assert revision.date is not None assert revision.root is not None # Processed content starting from the revision's root directory. date = provenance.revision_get_date(revision) if date is None or revision.date < date: graph = build_isochrone_graph( archive, provenance, revision, DirectoryEntry(revision.root), + minsize=minsize, ) - # TODO: add file size filtering revision_process_content( archive, provenance, revision, graph, trackall=trackall, lower=lower, mindepth=mindepth, + minsize=minsize, ) if commit: provenance.flush() @statsd.timed(metric=REVISION_DURATION_METRIC, tags={"method": "process_content"}) def revision_process_content( archive: ArchiveInterface, provenance: ProvenanceInterface, revision: RevisionEntry, graph: IsochroneNode, trackall: bool = True, lower: bool = True, mindepth: int = 1, + minsize: int = 0, ) -> None: assert revision.date is not None provenance.revision_add(revision) stack = [graph] while stack: current = stack.pop() if current.dbdate is not None: assert current.dbdate <= revision.date if trackall: # Current directory is an outer isochrone frontier for a previously # processed revision. It should be reused as is. provenance.directory_add_to_revision( revision, current.entry, current.path ) else: assert current.maxdate is not None # Current directory is not an outer isochrone frontier for any previous # revision. It might be eligible for this one. if is_new_frontier( current, revision=revision, trackall=trackall, lower=lower, mindepth=mindepth, ): # Outer frontier should be moved to current position in the isochrone # graph. This is the first time this directory is found in the isochrone # frontier. provenance.directory_set_date_in_isochrone_frontier( current.entry, current.maxdate ) if trackall: provenance.directory_add_to_revision( revision, current.entry, current.path ) - flatten_directory(archive, provenance, current.entry) + flatten_directory( + archive, provenance, current.entry, minsize=minsize + ) else: # If current node is an invalidated frontier, update its date for future # revisions to get the proper value. if current.invalid: provenance.directory_set_date_in_isochrone_frontier( current.entry, current.maxdate ) # No point moving the frontier here. Either there are no files or they # are being seen for the first time here. Add all blobs to current # revision updating date if necessary, and recursively analyse # subdirectories as candidates to the outer frontier. for blob in current.entry.files: date = provenance.content_get_early_date(blob) if date is None or revision.date < date: provenance.content_set_early_date(blob, revision.date) provenance.content_add_to_revision(revision, blob, current.path) for child in current.children: stack.append(child) @statsd.timed(metric=REVISION_DURATION_METRIC, tags={"method": "flatten_directory"}) def flatten_directory( archive: ArchiveInterface, provenance: ProvenanceInterface, directory: DirectoryEntry, + minsize: int = 0, ) -> None: """Recursively retrieve all the files of 'directory' and insert them in the 'provenance' database in the 'content_to_directory' table. """ stack = [(directory, b"")] while stack: current, prefix = stack.pop() - current.retrieve_children(archive) + current.retrieve_children(archive, minsize=minsize) for f_child in current.files: # Add content to the directory with the computed prefix. provenance.content_add_to_directory(directory, f_child, prefix) for d_child in current.dirs: # Recursively walk the child directory. stack.append((d_child, os.path.join(prefix, d_child.name))) def is_new_frontier( node: IsochroneNode, revision: RevisionEntry, trackall: bool = True, lower: bool = True, mindepth: int = 1, ) -> bool: assert node.maxdate is not None # for mypy assert revision.date is not None # idem if trackall: # The only real condition for a directory to be a frontier is that its content # is already known and its maxdate is less (or equal) than current revision's # date. Checking mindepth is meant to skip root directories (or any arbitrary # depth) to improve the result. The option lower tries to maximize the reuse # rate of previously defined frontiers by keeping them low in the directory # tree. return ( node.known and node.maxdate <= revision.date # all content is earlier than revision and node.depth >= mindepth # current node is deeper than the min allowed depth and (has_blobs(node) if lower else True) # there is at least one blob in it ) else: # If we are only tracking first occurrences, we want to ensure that all first # occurrences end up in the content_early_in_rev relation. Thus, we force for # every blob outside a frontier to have an strictly earlier date. return ( node.maxdate < revision.date # all content is earlier than revision and node.depth >= mindepth # deeper than the min allowed depth and (has_blobs(node) if lower else True) # there is at least one blob ) def has_blobs(node: IsochroneNode) -> bool: # We may want to look for files in different ways to decide whether to define a # frontier or not: # 1. Only files in current node: return any(node.entry.files) # 2. Files anywhere in the isochrone graph # stack = [node] # while stack: # current = stack.pop() # if any( # map(lambda child: isinstance(child.entry, FileEntry), current.children)): # return True # else: # # All children are directory entries. # stack.extend(current.children) # return False # 3. Files in the intermediate directories between current node and any previously # defined frontier: # TODO: complete this case! # return any( # map(lambda child: isinstance(child.entry, FileEntry), node.children) # ) or all( # map( # lambda child: ( # not (isinstance(child.entry, DirectoryEntry) and child.date is None) # ) # or has_blobs(child), # node.children, # ) # ) diff --git a/swh/provenance/storage/archive.py b/swh/provenance/storage/archive.py index 417f614..3093c29 100644 --- a/swh/provenance/storage/archive.py +++ b/swh/provenance/storage/archive.py @@ -1,71 +1,73 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime from typing import Any, Dict, Iterable, Set, Tuple from swh.core.statsd import statsd from swh.model.model import ObjectType, Sha1Git, TargetType from swh.storage.interface import StorageInterface ARCHIVE_DURATION_METRIC = "swh_provenance_archive_api_duration_seconds" class ArchiveStorage: def __init__(self, storage: StorageInterface) -> None: self.storage = storage @statsd.timed(metric=ARCHIVE_DURATION_METRIC, tags={"method": "directory_ls"}) - def directory_ls(self, id: Sha1Git) -> Iterable[Dict[str, Any]]: - # TODO: add file size filtering + def directory_ls(self, id: Sha1Git, minsize: int = 0) -> Iterable[Dict[str, Any]]: for entry in self.storage.directory_ls(id): - yield { - "name": entry["name"], - "target": entry["target"], - "type": entry["type"], - } + if entry["type"] == "dir" or ( + entry["type"] == "file" and entry["length"] >= minsize + ): + yield { + "name": entry["name"], + "target": entry["target"], + "type": entry["type"], + } @statsd.timed( metric=ARCHIVE_DURATION_METRIC, tags={"method": "revision_get_parents"} ) def revision_get_parents(self, id: Sha1Git) -> Iterable[Sha1Git]: rev = self.storage.revision_get([id])[0] if rev is not None: yield from rev.parents @statsd.timed(metric=ARCHIVE_DURATION_METRIC, tags={"method": "snapshot_get_heads"}) def snapshot_get_heads(self, id: Sha1Git) -> Iterable[Sha1Git]: from swh.core.utils import grouper from swh.storage.algos.snapshot import snapshot_get_all_branches snapshot = snapshot_get_all_branches(self.storage, id) assert snapshot is not None targets_set = set() releases_set = set() if snapshot is not None: for branch in snapshot.branches: if snapshot.branches[branch].target_type == TargetType.REVISION: targets_set.add(snapshot.branches[branch].target) elif snapshot.branches[branch].target_type == TargetType.RELEASE: releases_set.add(snapshot.branches[branch].target) batchsize = 100 for releases in grouper(releases_set, batchsize): targets_set.update( release.target for release in self.storage.release_get(list(releases)) if release is not None and release.target_type == ObjectType.REVISION ) revisions: Set[Tuple[datetime, Sha1Git]] = set() for targets in grouper(targets_set, batchsize): revisions.update( (revision.date.to_datetime(), revision.id) for revision in self.storage.revision_get(list(targets)) if revision is not None and revision.date is not None ) yield from (head for _, head in sorted(revisions))