diff --git a/swh/provenance/archive.py b/swh/provenance/archive.py index 42ca2f0..651c972 100644 --- a/swh/provenance/archive.py +++ b/swh/provenance/archive.py @@ -1,27 +1,27 @@ -from typing import Any, Dict, List +from typing import Any, Dict, Iterable, List from typing_extensions import Protocol, runtime_checkable @runtime_checkable class ArchiveInterface(Protocol): def directory_ls(self, id: bytes) -> List[Dict[str, Any]]: ... def iter_origins(self): ... def iter_origin_visits(self, origin: str): ... def iter_origin_visit_statuses(self, origin: str, visit: int): ... - def release_get(self, ids: List[bytes]): + def release_get(self, ids: Iterable[bytes]): ... - def revision_get(self, ids: List[bytes]): + def revision_get(self, ids: Iterable[bytes]): ... def snapshot_get_all_branches(self, snapshot: bytes): ... diff --git a/swh/provenance/cli.py b/swh/provenance/cli.py index 83c8ea2..29d2390 100644 --- a/swh/provenance/cli.py +++ b/swh/provenance/cli.py @@ -1,203 +1,203 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information # WARNING: do not import unnecessary things here to keep cli startup time under # control import os from typing import Any, Dict, Optional import click import yaml from swh.core import config from swh.core.cli import CONTEXT_SETTINGS from swh.core.cli import swh as swh_cli_group from swh.model.hashutil import hash_to_bytes, hash_to_hex # All generic config code should reside in swh.core.config CONFIG_ENVVAR = "SWH_CONFIG_FILE" DEFAULT_CONFIG_PATH = os.path.join(click.get_app_dir("swh"), "global.yml") DEFAULT_PATH = os.environ.get(CONFIG_ENVVAR, DEFAULT_CONFIG_PATH) DEFAULT_CONFIG: Dict[str, Any] = { "archive": { "cls": "api", "storage": { "cls": "remote", "url": "http://uffizi.internal.softwareheritage.org:5002", } # "cls": "direct", # "db": { # "host": "db.internal.softwareheritage.org", # "dbname": "softwareheritage", # "user": "guest" # } }, "provenance": {"cls": "local", "db": {"host": "localhost", "dbname": "provenance"}}, } CONFIG_FILE_HELP = f"""Configuration file: \b The CLI option or the environment variable will fail if invalid. CLI option is checked first. Then, environment variable {CONFIG_ENVVAR} is checked. Then, if cannot load the default path, a set of default values are used. Default config path is {DEFAULT_CONFIG_PATH}. Default config values are: \b {yaml.dump(DEFAULT_CONFIG)}""" PROVENANCE_HELP = f"""Software Heritage Scanner tools. {CONFIG_FILE_HELP}""" @swh_cli_group.group( name="provenance", context_settings=CONTEXT_SETTINGS, help=PROVENANCE_HELP ) @click.option( "-C", "--config-file", default=None, type=click.Path(exists=False, dir_okay=False, path_type=str), help="""YAML configuration file.""", ) @click.option( "-P", "--profile", default=None, type=click.Path(exists=False, dir_okay=False, path_type=str), help="""Enable profiling to specified file.""", ) @click.pass_context def cli(ctx, config_file: Optional[str], profile: str): if config_file is None and config.config_exists(DEFAULT_PATH): config_file = DEFAULT_PATH if config_file is None: conf = DEFAULT_CONFIG else: # read_raw_config do not fail on ENOENT if not config.config_exists(config_file): raise FileNotFoundError(config_file) conf = config.read_raw_config(config.config_basepath(config_file)) conf = config.merge_configs(DEFAULT_CONFIG, conf) ctx.ensure_object(dict) ctx.obj["config"] = conf if profile: import atexit import cProfile print("Profiling...") pr = cProfile.Profile() pr.enable() def exit(): pr.disable() pr.dump_stats(profile) atexit.register(exit) @cli.command(name="create", deprecated=True) @click.option("--maintenance-db", default=None) @click.option("--drop/--no-drop", "drop_db", default=False) @click.pass_context def create(ctx, maintenance_db, drop_db): """Deprecated, please use: swh db create provenance and swh db init provenance instead. """ @cli.command(name="iter-revisions") @click.argument("filename") @click.option("-l", "--limit", type=int) @click.option("-m", "--min-depth", default=1, type=int) @click.option("-r", "--reuse", default=True, type=bool) @click.pass_context def iter_revisions(ctx, filename, limit, min_depth, reuse): # TODO: add file size filtering """Process a provided list of revisions.""" from . import get_archive, get_provenance from .provenance import revision_add from .revision import CSVRevisionIterator archive = get_archive(**ctx.obj["config"]["archive"]) provenance = get_provenance(**ctx.obj["config"]["provenance"]) revisions_provider = ( line.strip().split(",") for line in open(filename, "r") if line.strip() ) revisions = CSVRevisionIterator(revisions_provider, archive, limit=limit) for revision in revisions: revision_add(provenance, archive, revision, lower=reuse, mindepth=min_depth) @cli.command(name="iter-origins") @click.argument("filename") @click.option("-l", "--limit", type=int) @click.pass_context def iter_origins(ctx, filename, limit): """Process a provided list of origins.""" from . import get_archive, get_provenance from .origin import FileOriginIterator from .provenance import origin_add archive = get_archive(**ctx.obj["config"]["archive"]) provenance = get_provenance(**ctx.obj["config"]["provenance"]) for origin in FileOriginIterator(filename, archive, limit=limit): - origin_add(provenance, origin) + origin_add(archive, provenance, origin) @cli.command(name="find-first") @click.argument("swhid") @click.pass_context def find_first(ctx, swhid): """Find first occurrence of the requested blob.""" from . import get_provenance provenance = get_provenance(**ctx.obj["config"]["provenance"]) # TODO: return a dictionary with proper keys for each field row = provenance.content_find_first(hash_to_bytes(swhid)) if row is not None: print( "swh:1:cnt:{cnt}, swh:1:rev:{rev}, {date}, {path}".format( cnt=hash_to_hex(row[0]), rev=hash_to_hex(row[1]), date=row[2], path=os.fsdecode(row[3]), ) ) else: print(f"Cannot find a content with the id {swhid}") @cli.command(name="find-all") @click.argument("swhid") @click.option("-l", "--limit", type=int) @click.pass_context def find_all(ctx, swhid, limit): """Find all occurrences of the requested blob.""" from swh.provenance import get_provenance provenance = get_provenance(**ctx.obj["config"]["provenance"]) # TODO: return a dictionary with proper keys for each field for row in provenance.content_find_all(hash_to_bytes(swhid), limit=limit): print( "swh:1:cnt:{cnt}, swh:1:rev:{rev}, {date}, {path}".format( cnt=hash_to_hex(row[0]), rev=hash_to_hex(row[1]), date=row[2], path=os.fsdecode(row[3]), ) ) diff --git a/swh/provenance/model.py b/swh/provenance/model.py index 6c4707d..a1bd4b5 100644 --- a/swh/provenance/model.py +++ b/swh/provenance/model.py @@ -1,32 +1,75 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from datetime import datetime +from typing import Iterable, List, Optional, Union + from .archive import ArchiveInterface -class TreeEntry: - def __init__(self, id: bytes, name: bytes): +class OriginEntry: + def __init__(self, url, revisions: Iterable["RevisionEntry"], id=None): self.id = id - self.name = name + self.url = url + self.revisions = revisions -class DirectoryEntry(TreeEntry): - def __init__(self, archive: ArchiveInterface, id: bytes, name: bytes): - super().__init__(id, name) - self.archive = archive - self.children = None +class RevisionEntry: + def __init__( + self, + id: bytes, + date: Optional[datetime] = None, + root: Optional[bytes] = None, + parents: Optional[Iterable[bytes]] = None, + ): + self.id = id + self.date = date + assert self.date is None or self.date.tzinfo is not None + self.root = root + self._parents = parents + self._nodes: List[RevisionEntry] = [] - def __iter__(self): - if self.children is None: - self.children = [] - for child in self.archive.directory_ls(self.id): - if child["type"] == "dir": - self.children.append( - DirectoryEntry(self.archive, child["target"], child["name"]) + def parents(self, archive: ArchiveInterface): + if self._parents is None: + # XXX: no check is done to ensure node.id is a known revision in + # the SWH archive + self._parents = archive.revision_get([self.id])[0].parents + if self._parents: + self._nodes = [ + RevisionEntry( + id=rev.id, + root=rev.directory, + date=rev.date, + parents=rev.parents, ) + for rev in archive.revision_get(self._parents) + if rev + ] + yield from self._nodes - elif child["type"] == "file": - self.children.append(FileEntry(child["target"], child["name"])) - return iter(self.children) +class DirectoryEntry: + def __init__(self, id: bytes, name: bytes): + self.id = id + self.name = name + self._children: Optional[List[Union[DirectoryEntry, FileEntry]]] = None + + def ls(self, archive: ArchiveInterface): + if self._children is None: + self._children = [] + for child in archive.directory_ls(self.id): + if child["type"] == "dir": + self._children.append( + DirectoryEntry(child["target"], child["name"]) + ) + elif child["type"] == "file": + self._children.append(FileEntry(child["target"], child["name"])) + yield from self._children -class FileEntry(TreeEntry): - pass +class FileEntry: + def __init__(self, id: bytes, name: bytes): + self.id = id + self.name = name diff --git a/swh/provenance/origin.py b/swh/provenance/origin.py index 2b75685..c507faf 100644 --- a/swh/provenance/origin.py +++ b/swh/provenance/origin.py @@ -1,102 +1,89 @@ from typing import Optional from swh.model.model import ObjectType, Origin, TargetType from .archive import ArchiveInterface -from .revision import RevisionEntry - - -class OriginEntry: - def __init__(self, url, revisions, id=None): - self.id = id - self.url = url - self.revisions = revisions - +from .model import OriginEntry, RevisionEntry ################################################################################ ################################################################################ class FileOriginIterator: """Iterator over origins present in the given CSV file.""" def __init__( self, filename: str, archive: ArchiveInterface, limit: Optional[int] = None ): self.file = open(filename) self.limit = limit self.archive = archive def __iter__(self): yield from iterate_statuses( [Origin(url.strip()) for url in self.file], self.archive, self.limit ) class ArchiveOriginIterator: """Iterator over origins present in the given storage.""" def __init__(self, archive: ArchiveInterface, limit: Optional[int] = None): self.limit = limit self.archive = archive def __iter__(self): yield from iterate_statuses( self.archive.iter_origins(), self.archive, self.limit ) def iterate_statuses(origins, archive: ArchiveInterface, limit: Optional[int] = None): idx = 0 for origin in origins: for visit in archive.iter_origin_visits(origin.url): for status in archive.iter_origin_visit_statuses(origin.url, visit.visit): snapshot = archive.snapshot_get_all_branches(status.snapshot) if snapshot is None: continue # TODO: may filter only those whose status is 'full'?? targets_set = set() releases_set = set() if snapshot is not None: for branch in snapshot.branches: if snapshot.branches[branch].target_type == TargetType.REVISION: targets_set.add(snapshot.branches[branch].target) elif ( snapshot.branches[branch].target_type == TargetType.RELEASE ): releases_set.add(snapshot.branches[branch].target) # This is done to keep the query in release_get small, hence avoiding # a timeout. batchsize = 100 - releases = list(releases_set) - while releases: - for release in archive.release_get(releases[:batchsize]): + while releases_set: + releases = [ + releases_set.pop() for i in range(batchsize) if releases_set + ] + for release in archive.release_get(releases): if release is not None: if release.target_type == ObjectType.REVISION: targets_set.add(release.target) - releases[:batchsize] = [] # This is done to keep the query in revision_get small, hence avoiding # a timeout. revisions = set() - targets = list(targets_set) - while targets: - for revision in archive.revision_get(targets[:batchsize]): + while targets_set: + targets = [ + targets_set.pop() for i in range(batchsize) if targets_set + ] + for revision in archive.revision_get(targets): if revision is not None: - parents = list( - map( - lambda id: RevisionEntry(archive, id), - revision.parents, - ) - ) - revisions.add( - RevisionEntry(archive, revision.id, parents=parents) - ) - targets[:batchsize] = [] + revisions.add(RevisionEntry(revision.id)) + # target_set |= set(revision.parents) yield OriginEntry(status.origin, list(revisions)) idx += 1 if idx == limit: return diff --git a/swh/provenance/postgresql/archive.py b/swh/provenance/postgresql/archive.py index 44eff62..15399bf 100644 --- a/swh/provenance/postgresql/archive.py +++ b/swh/provenance/postgresql/archive.py @@ -1,77 +1,77 @@ -from typing import Any, Dict, List +from typing import Any, Dict, Iterable, List from methodtools import lru_cache import psycopg2 class ArchivePostgreSQL: def __init__(self, conn: psycopg2.extensions.connection): self.conn = conn def directory_ls(self, id: bytes) -> List[Dict[str, Any]]: # TODO: only call directory_ls_internal if the id is not being queried by # someone else. Otherwise wait until results get properly cached. entries = self.directory_ls_internal(id) return entries @lru_cache(maxsize=1000000) def directory_ls_internal(self, id: bytes) -> List[Dict[str, Any]]: # TODO: add file size filtering with self.conn.cursor() as cursor: cursor.execute( """WITH dir AS (SELECT id AS dir_id, dir_entries, file_entries, rev_entries FROM directory WHERE id=%s), ls_d AS (SELECT dir_id, UNNEST(dir_entries) AS entry_id FROM dir), ls_f AS (SELECT dir_id, UNNEST(file_entries) AS entry_id FROM dir), ls_r AS (SELECT dir_id, UNNEST(rev_entries) AS entry_id FROM dir) (SELECT 'dir'::directory_entry_type AS type, e.target, e.name, NULL::sha1_git FROM ls_d LEFT JOIN directory_entry_dir e ON ls_d.entry_id=e.id) UNION (WITH known_contents AS (SELECT 'file'::directory_entry_type AS type, e.target, e.name, c.sha1_git FROM ls_f LEFT JOIN directory_entry_file e ON ls_f.entry_id=e.id INNER JOIN content c ON e.target=c.sha1_git) SELECT * FROM known_contents UNION (SELECT 'file'::directory_entry_type AS type, e.target, e.name, c.sha1_git FROM ls_f LEFT JOIN directory_entry_file e ON ls_f.entry_id=e.id LEFT JOIN skipped_content c ON e.target=c.sha1_git WHERE NOT EXISTS ( SELECT 1 FROM known_contents WHERE known_contents.sha1_git=e.target ) ) ) ORDER BY name """, (id,), ) return [ {"type": row[0], "target": row[1], "name": row[2]} for row in cursor.fetchall() ] def iter_origins(self): raise NotImplementedError def iter_origin_visits(self, origin: str): raise NotImplementedError def iter_origin_visit_statuses(self, origin: str, visit: int): raise NotImplementedError - def release_get(self, ids: List[bytes]): + def release_get(self, ids: Iterable[bytes]): raise NotImplementedError - def revision_get(self, ids: List[bytes]): + def revision_get(self, ids: Iterable[bytes]): raise NotImplementedError def snapshot_get_all_branches(self, snapshot: bytes): raise NotImplementedError diff --git a/swh/provenance/provenance.py b/swh/provenance/provenance.py index d41397c..cb55dab 100644 --- a/swh/provenance/provenance.py +++ b/swh/provenance/provenance.py @@ -1,392 +1,416 @@ from datetime import datetime, timezone import os from typing import Dict, Generator, List, Optional, Tuple from typing_extensions import Protocol, runtime_checkable from .archive import ArchiveInterface -from .model import DirectoryEntry, FileEntry, TreeEntry -from .origin import OriginEntry -from .revision import RevisionEntry +from .model import DirectoryEntry, FileEntry, OriginEntry, RevisionEntry UTCMIN = datetime.min.replace(tzinfo=timezone.utc) @runtime_checkable class ProvenanceInterface(Protocol): def commit(self): """Commit currently ongoing transactions in the backend DB""" ... def content_add_to_directory( self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes ) -> None: ... def content_add_to_revision( self, revision: RevisionEntry, blob: FileEntry, prefix: bytes ) -> None: ... def content_find_first( self, blobid: bytes ) -> Optional[Tuple[bytes, bytes, datetime, bytes]]: ... def content_find_all( self, blobid: bytes, limit: Optional[int] = None ) -> Generator[Tuple[bytes, bytes, datetime, bytes], None, None]: ... def content_get_early_date(self, blob: FileEntry) -> Optional[datetime]: ... def content_get_early_dates(self, blobs: List[FileEntry]) -> Dict[bytes, datetime]: ... def content_set_early_date(self, blob: FileEntry, date: datetime) -> None: ... def directory_add_to_revision( self, revision: RevisionEntry, directory: DirectoryEntry, path: bytes ) -> None: ... def directory_get_date_in_isochrone_frontier( self, directory: DirectoryEntry ) -> Optional[datetime]: ... def directory_get_dates_in_isochrone_frontier( self, dirs: List[DirectoryEntry] ) -> Dict[bytes, datetime]: ... def directory_invalidate_in_isochrone_frontier( self, directory: DirectoryEntry ) -> None: ... def directory_set_date_in_isochrone_frontier( self, directory: DirectoryEntry, date: datetime ) -> None: ... def origin_get_id(self, origin: OriginEntry) -> int: ... def revision_add(self, revision: RevisionEntry) -> None: ... def revision_add_before_revision( self, relative: RevisionEntry, revision: RevisionEntry ) -> None: ... def revision_add_to_origin( self, origin: OriginEntry, revision: RevisionEntry ) -> None: ... def revision_get_early_date(self, revision: RevisionEntry) -> Optional[datetime]: ... def revision_get_preferred_origin(self, revision: RevisionEntry) -> int: ... def revision_in_history(self, revision: RevisionEntry) -> bool: ... def revision_set_preferred_origin( self, origin: OriginEntry, revision: RevisionEntry ) -> None: ... def revision_visited(self, revision: RevisionEntry) -> bool: ... def directory_process_content( - provenance: ProvenanceInterface, directory: DirectoryEntry, relative: DirectoryEntry + archive: ArchiveInterface, + provenance: ProvenanceInterface, + directory: DirectoryEntry, + relative: DirectoryEntry, ) -> None: stack = [(directory, b"")] while stack: current, prefix = stack.pop() - for child in iter(current): + for child in current.ls(archive): if isinstance(child, FileEntry): # Add content to the relative directory with the computed prefix. provenance.content_add_to_directory(relative, child, prefix) - else: + elif isinstance(child, DirectoryEntry): # Recursively walk the child directory. stack.append((child, os.path.join(prefix, child.name))) -def origin_add(provenance: ProvenanceInterface, origin: OriginEntry) -> None: +def origin_add( + archive: ArchiveInterface, provenance: ProvenanceInterface, origin: OriginEntry +) -> None: # TODO: refactor to iterate over origin visit statuses and commit only once # per status. origin.id = provenance.origin_get_id(origin) for revision in origin.revisions: - origin_add_revision(provenance, origin, revision) + origin_add_revision(archive, provenance, origin, revision) # Commit after each revision provenance.commit() # TODO: verify this! def origin_add_revision( - provenance: ProvenanceInterface, origin: OriginEntry, revision: RevisionEntry + archive: ArchiveInterface, + provenance: ProvenanceInterface, + origin: OriginEntry, + revision: RevisionEntry, ) -> None: stack: List[Tuple[Optional[RevisionEntry], RevisionEntry]] = [(None, revision)] while stack: relative, current = stack.pop() # Check if current revision has no preferred origin and update if necessary. preferred = provenance.revision_get_preferred_origin(current) if preferred is None: provenance.revision_set_preferred_origin(origin, current) ######################################################################## if relative is None: # This revision is pointed directly by the origin. visited = provenance.revision_visited(current) provenance.revision_add_to_origin(origin, current) if not visited: stack.append((current, current)) else: # This revision is a parent of another one in the history of the # relative revision. - for parent in iter(current): + for parent in current.parents(archive): visited = provenance.revision_visited(parent) if not visited: # The parent revision has never been seen before pointing # directly to an origin. known = provenance.revision_in_history(parent) if known: # The parent revision is already known in some other # revision's history. We should point it directly to # the origin and (eventually) walk its history. stack.append((None, parent)) else: # The parent revision was never seen before. We should # walk its history and associate it with the same # relative revision. provenance.revision_add_before_revision(relative, parent) stack.append((relative, parent)) else: # The parent revision already points to an origin, so its # history was properly processed before. We just need to # make sure it points to the current origin as well. provenance.revision_add_to_origin(origin, parent) def revision_add( provenance: ProvenanceInterface, archive: ArchiveInterface, revision: RevisionEntry, lower: bool = True, mindepth: int = 1, ) -> None: assert revision.date is not None assert revision.root is not None # Processed content starting from the revision's root directory. date = provenance.revision_get_early_date(revision) if date is None or revision.date < date: provenance.revision_add(revision) # TODO: add file size filtering revision_process_content( + archive, provenance, revision, - DirectoryEntry(archive, revision.root, b""), + DirectoryEntry(revision.root, b""), lower=lower, mindepth=mindepth, ) # TODO: improve this! Maybe using a max attempt counter? # Ideally Provenance class should guarantee that a commit never fails. while not provenance.commit(): continue class IsochroneNode: def __init__( - self, entry: TreeEntry, dates: Dict[bytes, datetime] = {}, depth: int = 0 + self, entry: DirectoryEntry, dates: Dict[bytes, datetime] = {}, depth: int = 0 ): self.entry = entry self.depth = depth self.date = dates.get(self.entry.id, None) self.children: List[IsochroneNode] = [] self.maxdate: Optional[datetime] = None def add_child( - self, child: TreeEntry, dates: Dict[bytes, datetime] = {} + self, child: DirectoryEntry, dates: Dict[bytes, datetime] = {} ) -> "IsochroneNode": assert isinstance(self.entry, DirectoryEntry) and self.date is None node = IsochroneNode(child, dates=dates, depth=self.depth + 1) self.children.append(node) return node def build_isochrone_graph( - provenance: ProvenanceInterface, revision: RevisionEntry, directory: DirectoryEntry + archive: ArchiveInterface, + provenance: ProvenanceInterface, + revision: RevisionEntry, + directory: DirectoryEntry, ) -> IsochroneNode: assert revision.date is not None + assert revision.root == directory.id + # Build the nodes structure root = IsochroneNode(directory) root.date = provenance.directory_get_date_in_isochrone_frontier(directory) stack = [root] while stack: current = stack.pop() assert isinstance(current.entry, DirectoryEntry) if current.date is None or current.date >= revision.date: # If current directory has an associated date in the isochrone frontier that # is greater or equal to the current revision's one, it should be ignored as # the revision is being processed out of order. if current.date is not None and current.date >= revision.date: provenance.directory_invalidate_in_isochrone_frontier(current.entry) current.date = None # Pre-query all known dates for content/directories in the current directory # for the provenance object to have them cached and (potentially) improve # performance. ddates = provenance.directory_get_dates_in_isochrone_frontier( - [child for child in current.entry if isinstance(child, DirectoryEntry)] + [ + child + for child in current.entry.ls(archive) + if isinstance(child, DirectoryEntry) + ] ) fdates = provenance.content_get_early_dates( - [child for child in current.entry if isinstance(child, FileEntry)] + [ + child + for child in current.entry.ls(archive) + if isinstance(child, FileEntry) + ] ) - for child in current.entry: + for child in current.entry.ls(archive): # Recursively analyse directory nodes. if isinstance(child, DirectoryEntry): node = current.add_child(child, dates=ddates) stack.append(node) else: current.add_child(child, dates=fdates) # Precalculate max known date for each node in the graph. stack = [root] while stack: current = stack.pop() if current.date is None: if any(map(lambda child: child.maxdate is None, current.children)): # Current node needs to be analysed again after its children. stack.append(current) for child in current.children: if isinstance(child.entry, FileEntry): if child.date is not None: # File node that has been seen before, just use its known # date. child.maxdate = child.date else: # File node that has never been seen before, use current # revision date. child.maxdate = revision.date else: # Recursively analyse directory nodes. stack.append(child) else: maxdates = [ child.maxdate for child in current.children if child.maxdate is not None # mostly to please mypy ] current.maxdate = max(maxdates) if maxdates else UTCMIN else: # Directory node in the frontier, just use its known date. current.maxdate = current.date return root def revision_process_content( + archive: ArchiveInterface, provenance: ProvenanceInterface, revision: RevisionEntry, root: DirectoryEntry, lower: bool = True, mindepth: int = 1, ): assert revision.date is not None - stack = [(build_isochrone_graph(provenance, revision, root), root.name)] + stack = [(build_isochrone_graph(archive, provenance, revision, root), root.name)] while stack: current, path = stack.pop() assert isinstance(current.entry, DirectoryEntry) if current.date is not None: assert current.date < revision.date # Current directory is an outer isochrone frontier for a previously # processed revision. It should be reused as is. provenance.directory_add_to_revision(revision, current.entry, path) else: # Current directory is not an outer isochrone frontier for any previous # revision. It might be eligible for this one. if is_new_frontier(current, revision, lower=lower, mindepth=mindepth): assert current.maxdate is not None # Outer frontier should be moved to current position in the isochrone # graph. This is the first time this directory is found in the isochrone # frontier. provenance.directory_set_date_in_isochrone_frontier( current.entry, current.maxdate ) provenance.directory_add_to_revision(revision, current.entry, path) directory_process_content( - provenance, directory=current.entry, relative=current.entry, + archive, + provenance, + directory=current.entry, + relative=current.entry, ) else: # No point moving the frontier here. Either there are no files or they # are being seen for the first time here. Add all blobs to current # revision updating date if necessary, and recursively analyse # subdirectories as candidates to the outer frontier. for child in current.children: if isinstance(child.entry, FileEntry): blob = child.entry if child.date is None or revision.date < child.date: provenance.content_set_early_date(blob, revision.date) provenance.content_add_to_revision(revision, blob, path) else: stack.append((child, os.path.join(path, child.entry.name))) def is_new_frontier( node: IsochroneNode, revision: RevisionEntry, lower: bool = True, mindepth: int = 1 ) -> bool: assert node.maxdate is not None and revision.date is not None # The only real condition for a directory to be a frontier is that its maxdate is # strictily less than current revision's date. Checking mindepth is meant to skip # root directories (or any arbitrary depth) to improve the result. The option lower # tries to maximize the reusage rate of previously defined frontiers by keeping them # low in the directory tree. return ( node.maxdate < revision.date # all content in node is already known and node.depth >= mindepth # current node is deeper than the min allowed depth and (has_blobs(node) if lower else True) # there is at least one blob in it ) def has_blobs(node: IsochroneNode) -> bool: # We may want to look for files in different ways to decide whether to define a # frontier or not: # 1. Only files in current node: return any(map(lambda child: isinstance(child.entry, FileEntry), node.children)) # 2. Files anywhere in the isochrone graph # stack = [node] # while stack: # current = stack.pop() # if any( # map(lambda child: isinstance(child.entry, FileEntry), current.children)): # return True # else: # # All children are directory entries. # stack.extend(current.children) # return False # 3. Files in the intermediate directories between current node and any previously # defined frontier: # TODO: complete this case! # return any( # map(lambda child: isinstance(child.entry, FileEntry), node.children) # ) or all( # map( # lambda child: ( # not (isinstance(child.entry, DirectoryEntry) and child.date is None) # ) # or has_blobs(child), # node.children, # ) # ) diff --git a/swh/provenance/revision.py b/swh/provenance/revision.py index 17d99bd..5adf3b9 100644 --- a/swh/provenance/revision.py +++ b/swh/provenance/revision.py @@ -1,186 +1,152 @@ from datetime import datetime, timezone -import iso8601 from itertools import islice import threading from typing import Iterable, Iterator, Optional, Tuple -from swh.model.hashutil import hash_to_bytes - -from .archive import ArchiveInterface - - -class RevisionEntry: - def __init__( - self, - archive: ArchiveInterface, - id: bytes, - date: Optional[datetime] = None, - root: Optional[bytes] = None, - parents: Optional[list] = None, - ): - self.archive = archive - self.id = id - self.date = date - assert self.date is None or self.date.tzinfo is not None - self.parents = parents - self.root = root - - def __iter__(self): - if self.parents is None: - self.parents = [] - for parent in self.archive.revision_get([self.id]): - if parent is not None: - self.parents.append( - RevisionEntry( - self.archive, - parent.id, - parents=[ - RevisionEntry(self.archive, id) for id in parent.parents - ], - ) - ) - - return iter(self.parents) +import iso8601 +from swh.model.hashutil import hash_to_bytes +from swh.provenance.archive import ArchiveInterface +from swh.provenance.model import RevisionEntry ######################################################################################## ######################################################################################## class CSVRevisionIterator: """Iterator over revisions typically present in the given CSV file. The input is an iterator that produces 3 elements per row: (id, date, root) where: - id: is the id (sha1_git) of the revision - date: is the author date - root: sha1 of the directory """ def __init__( self, revisions: Iterable[Tuple[bytes, datetime, bytes]], archive: ArchiveInterface, limit: Optional[int] = None, ): self.revisions: Iterator[Tuple[bytes, datetime, bytes]] if limit is not None: self.revisions = islice(revisions, limit) else: self.revisions = iter(revisions) self.mutex = threading.Lock() self.archive = archive def __iter__(self): return self def __next__(self): with self.mutex: id, date, root = next(self.revisions) date = iso8601.parse_date(date) if date.tzinfo is None: date = date.replace(tzinfo=timezone.utc) return RevisionEntry( - self.archive, hash_to_bytes(id), date=date, root=hash_to_bytes(root), + hash_to_bytes(id), date=date, root=hash_to_bytes(root), ) # class ArchiveRevisionIterator(RevisionIterator): # """Iterator over revisions present in the given database.""" # # def __init__(self, conn, limit=None, chunksize=100): # self.cur = conn.cursor() # self.chunksize = chunksize # self.records = [] # if limit is None: # self.cur.execute('''SELECT id, date, committer_date, directory # FROM revision''') # else: # self.cur.execute('''SELECT id, date, committer_date, directory # FROM revision # LIMIT %s''', (limit,)) # for row in self.cur.fetchmany(self.chunksize): # record = self.make_record(row) # if record is not None: # self.records.append(record) # self.mutex = threading.Lock() # # def __del__(self): # self.cur.close() # # def next(self): # self.mutex.acquire() # if not self.records: # self.records.clear() # for row in self.cur.fetchmany(self.chunksize): # record = self.make_record(row) # if record is not None: # self.records.append(record) # # if self.records: # revision, *self.records = self.records # self.mutex.release() # return revision # else: # self.mutex.release() # return None # # def make_record(self, row): # # Only revision with author or committer date are considered # if row[1] is not None: # # If the revision has author date, it takes precedence # return RevisionEntry(row[0], row[1], row[3]) # elif row[2] is not None: # # If not, we use the committer date # return RevisionEntry(row[0], row[2], row[3]) ######################################################################################## ######################################################################################## # class RevisionWorker(threading.Thread): # def __init__( # self, # id: int, # conninfo: dict, # archive: ArchiveInterface, # revisions: RevisionIterator # ): # from .provenance import get_provenance # # super().__init__() # self.archive = archive # self.id = id # self.provenance = get_provenance(conninfo) # self.revisions = revisions # # # def run(self): # from .provenance import revision_add # # # while True: # revision = self.revisions.next() # if revision is None: break # # processed = False # while not processed: # logging.info( # f'Thread {( # self.id # )} - Processing revision {( # hash_to_hex(revision.id) # )} (timestamp: {revision.date})' # ) # processed = revision_add(self.provenance, self.archive, revision) # if not processed: # logging.warning( # f'Thread {( # self.id # )} - Failed to process revision {( # hash_to_hex(revision.id) # )} (timestamp: {revision.date})' # ) diff --git a/swh/provenance/storage/archive.py b/swh/provenance/storage/archive.py index 06b7ce5..f3b31f3 100644 --- a/swh/provenance/storage/archive.py +++ b/swh/provenance/storage/archive.py @@ -1,47 +1,47 @@ -from typing import Any, Dict, List +from typing import Any, Dict, Iterable, List # from functools import lru_cache from methodtools import lru_cache from swh.storage.interface import StorageInterface class ArchiveStorage: def __init__(self, storage: StorageInterface): self.storage = storage @lru_cache(maxsize=1000000) def directory_ls(self, id: bytes) -> List[Dict[str, Any]]: # TODO: filter unused fields return [entry for entry in self.storage.directory_ls(id)] def iter_origins(self): from swh.storage.algos.origin import iter_origins yield from iter_origins(self.storage) def iter_origin_visits(self, origin: str): from swh.storage.algos.origin import iter_origin_visits # TODO: filter unused fields yield from iter_origin_visits(self.storage, origin) def iter_origin_visit_statuses(self, origin: str, visit: int): from swh.storage.algos.origin import iter_origin_visit_statuses # TODO: filter unused fields yield from iter_origin_visit_statuses(self.storage, origin, visit) - def release_get(self, ids: List[bytes]): + def release_get(self, ids: Iterable[bytes]): # TODO: filter unused fields - yield from self.storage.release_get(ids) + yield from self.storage.release_get(list(ids)) - def revision_get(self, ids: List[bytes]): + def revision_get(self, ids: Iterable[bytes]): # TODO: filter unused fields - yield from self.storage.revision_get(ids) + yield from self.storage.revision_get(list(ids)) def snapshot_get_all_branches(self, snapshot: bytes): from swh.storage.algos.snapshot import snapshot_get_all_branches # TODO: filter unused fields return snapshot_get_all_branches(self.storage, snapshot) diff --git a/swh/provenance/tests/test_provenance_db.py b/swh/provenance/tests/test_provenance_db.py index 1ebedd7..5f39575 100644 --- a/swh/provenance/tests/test_provenance_db.py +++ b/swh/provenance/tests/test_provenance_db.py @@ -1,241 +1,233 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import pytest from swh.model.tests.swh_model_data import TEST_OBJECTS +from swh.provenance.model import RevisionEntry from swh.provenance.origin import OriginEntry from swh.provenance.provenance import origin_add, revision_add -from swh.provenance.revision import RevisionEntry +from swh.provenance.storage.archive import ArchiveStorage from swh.provenance.tests.conftest import synthetic_result def ts2dt(ts: dict) -> datetime.datetime: timestamp = datetime.datetime.fromtimestamp( ts["timestamp"]["seconds"], datetime.timezone(datetime.timedelta(minutes=ts["offset"])), ) return timestamp.replace(microsecond=ts["timestamp"]["microseconds"]) def test_provenance_origin_add(provenance, swh_storage_with_objects): """Test the ProvenanceDB.origin_add() method""" for origin in TEST_OBJECTS["origin"]: entry = OriginEntry(url=origin.url, revisions=[]) - origin_add(provenance, entry) + origin_add(ArchiveStorage(swh_storage_with_objects), provenance, entry) # TODO: check some facts here def test_provenance_add_revision(provenance, storage_and_CMDBTS, archive_pg): storage, data = storage_and_CMDBTS for i in range(2): # do it twice, there should be no change in results for revision in data["revision"]: entry = RevisionEntry( - archive_pg, id=revision["id"], date=ts2dt(revision["date"]), root=revision["directory"], - parents=revision["parents"], ) revision_add(provenance, archive_pg, entry) # there should be as many entries in 'revision' as revisions from the # test dataset provenance.cursor.execute("SELECT count(*) FROM revision") assert provenance.cursor.fetchone()[0] == len(data["revision"]) # there should be no 'location' for the empty path provenance.cursor.execute("SELECT count(*) FROM location WHERE path=''") assert provenance.cursor.fetchone()[0] == 0 # there should be 32 'location' for non-empty path provenance.cursor.execute("SELECT count(*) FROM location WHERE path!=''") assert provenance.cursor.fetchone()[0] == 32 # there should be as many entries in 'revision' as revisions from the # test dataset provenance.cursor.execute("SELECT count(*) FROM revision") assert provenance.cursor.fetchone()[0] == len(data["revision"]) # 7 directories provenance.cursor.execute("SELECT count(*) FROM directory") assert provenance.cursor.fetchone()[0] == 7 # 12 D-R entries provenance.cursor.execute("SELECT count(*) FROM directory_in_rev") assert provenance.cursor.fetchone()[0] == 12 provenance.cursor.execute("SELECT count(*) FROM content") assert provenance.cursor.fetchone()[0] == len(data["content"]) provenance.cursor.execute("SELECT count(*) FROM content_in_dir") assert provenance.cursor.fetchone()[0] == 16 provenance.cursor.execute("SELECT count(*) FROM content_early_in_rev") assert provenance.cursor.fetchone()[0] == 13 def test_provenance_content_find_first(provenance, storage_and_CMDBTS, archive_pg): storage, data = storage_and_CMDBTS for revision in data["revision"]: entry = RevisionEntry( - archive_pg, - id=revision["id"], - date=ts2dt(revision["date"]), - root=revision["directory"], - parents=revision["parents"], + id=revision["id"], date=ts2dt(revision["date"]), root=revision["directory"], ) revision_add(provenance, archive_pg, entry) first_expected_content = [ { "content": "43f3c871310a8e524004e91f033e7fb3b0bc8475", "rev": "35ccb8dd1b53d2d8a5c1375eb513ef2beaa79ae5", "date": 1609757158, "path": "README.md", }, { "content": "6dc7e44ead5c0e300fe94448c3e046dfe33ad4d1", "rev": "9e36e095b79e36a3da104ce272989b39cd68aefd", "date": 1610644094, "path": "Red/Blue/Green/a", }, { "content": "9f6e04be05297905f1275d3f4e0bb0583458b2e8", "rev": "bfbfcc72ae7fc35d6941386c36280512e6b38440", "date": 1610644097, "path": "Red/Blue/Green/b", }, { "content": "a28fa70e725ebda781e772795ca080cd737b823c", "rev": "0a31c9d509783abfd08f9fdfcd3acae20f17dfd0", "date": 1610644099, "path": "Red/Blue/c", }, { "content": "c0229d305adf3edf49f031269a70e3e87665fe88", "rev": "1d1fcf1816a8a2a77f9b1f342ba11d0fe9fd7f17", "date": 1610644105, "path": "Purple/d", }, { "content": "94ba40161084e8b80943accd9d24e1f9dd47189b", "rev": "55d4dc9471de6144f935daf3c38878155ca274d5", "date": 1610644113, "path": ("Dark/Brown/Purple/f", "Dark/Brown/Purple/g", "Dark/h"), # XXX }, { "content": "5e8f9ceaee9dafae2e3210e254fdf170295f8b5b", "rev": "a8939755d0be76cfea136e9e5ebce9bc51c49fef", "date": 1610644116, "path": "Dark/h", }, { "content": "bbd54b961764094b13f10cef733e3725d0a834c3", "rev": "ca1774a07b6e02c1caa7ae678924efa9259ee7c6", "date": 1610644118, "path": "Paris/i", }, { "content": "7ce4fe9a22f589fa1656a752ea371b0ebc2106b1", "rev": "611fe71d75b6ea151b06e3845c09777acc783d82", "date": 1610644120, "path": "Paris/j", }, { "content": "cb79b39935c9392fa5193d9f84a6c35dc9c22c75", "rev": "4c5551b4969eb2160824494d40b8e1f6187fc01e", "date": 1610644122, "path": "Paris/k", }, ] for expected in first_expected_content: contentid = bytes.fromhex(expected["content"]) (blob, rev, date, path) = provenance.content_find_first(contentid) if isinstance(expected["path"], tuple): assert bytes(path).decode() in expected["path"] else: assert bytes(path).decode() == expected["path"] assert bytes(blob) == contentid assert bytes(rev).hex() == expected["rev"] assert int(date.timestamp()) == expected["date"] @pytest.mark.parametrize( "syntheticfile, args", ( ("synthetic_noroot_lower.txt", {"lower": True, "mindepth": 1}), ("synthetic_noroot_upper.txt", {"lower": False, "mindepth": 1}), ), ) def test_provenance_db(provenance, storage_and_CMDBTS, archive_pg, syntheticfile, args): storage, data = storage_and_CMDBTS revisions = {rev["id"]: rev for rev in data["revision"]} rows = { "content": set(), "content_in_dir": set(), "content_early_in_rev": set(), "directory": set(), "directory_in_rev": set(), "location": set(), "revision": set(), } def db_count(table): provenance.cursor.execute(f"SELECT count(*) FROM {table}") return provenance.cursor.fetchone()[0] for synth_rev in synthetic_result(syntheticfile): revision = revisions[synth_rev["sha1"]] entry = RevisionEntry( - archive_pg, - id=revision["id"], - date=ts2dt(revision["date"]), - root=revision["directory"], - parents=revision["parents"], + id=revision["id"], date=ts2dt(revision["date"]), root=revision["directory"], ) revision_add(provenance, archive_pg, entry, **args) + # import pdb; pdb.set_trace() # each "entry" in the synth file is one new revision rows["revision"].add(synth_rev["sha1"]) assert len(rows["revision"]) == db_count("revision") # this revision might have added new content objects rows["content"] |= set(x["dst"] for x in synth_rev["R_C"]) rows["content"] |= set(x["dst"] for x in synth_rev["D_C"]) assert len(rows["content"]) == db_count("content") # check for R-C (direct) entries rows["content_early_in_rev"] |= set( (x["src"], x["dst"], x["path"]) for x in synth_rev["R_C"] ) assert len(rows["content_early_in_rev"]) == db_count("content_early_in_rev") # check directories rows["directory"] |= set(x["dst"] for x in synth_rev["R_D"]) assert len(rows["directory"]) == db_count("directory") # check for R-D entries rows["directory_in_rev"] |= set( (x["src"], x["dst"], x["path"]) for x in synth_rev["R_D"] ) assert len(rows["directory_in_rev"]) == db_count("directory_in_rev") # check for D-C entries rows["content_in_dir"] |= set( (x["src"], x["dst"], x["path"]) for x in synth_rev["D_C"] ) assert len(rows["content_in_dir"]) == db_count("content_in_dir") # check for location entries rows["location"] |= set(x["path"] for x in synth_rev["R_C"]) rows["location"] |= set(x["path"] for x in synth_rev["D_C"]) rows["location"] |= set(x["path"] for x in synth_rev["R_D"]) assert len(rows["location"]) == db_count("location")