diff --git a/requirements.txt b/requirements.txt index d5ffdb9..a3cdae8 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,5 @@ # Add here external Python modules dependencies, one per line. Module names # should match https://pypi.python.org/pypi names. For the full spec or # dependency lines, see https://pip.readthedocs.org/en/1.1/requirements.html methodtools +pytz diff --git a/swh/provenance/cli.py b/swh/provenance/cli.py index 77342e0..7da3d0c 100644 --- a/swh/provenance/cli.py +++ b/swh/provenance/cli.py @@ -1,200 +1,201 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information # WARNING: do not import unnecessary things here to keep cli startup time under # control import os from typing import Any, Dict, Optional import click import yaml from swh.core import config from swh.core.cli import CONTEXT_SETTINGS from swh.core.cli import swh as swh_cli_group from swh.model.hashutil import hash_to_bytes, hash_to_hex # All generic config code should reside in swh.core.config CONFIG_ENVVAR = "SWH_CONFIG_FILE" DEFAULT_CONFIG_PATH = os.path.join(click.get_app_dir("swh"), "global.yml") DEFAULT_PATH = os.environ.get(CONFIG_ENVVAR, DEFAULT_CONFIG_PATH) DEFAULT_CONFIG: Dict[str, Any] = { "archive": { "cls": "api", "storage": { "cls": "remote", "url": "http://uffizi.internal.softwareheritage.org:5002", } - # "cls": "local", + # "cls": "direct", # "db": { # "host": "db.internal.softwareheritage.org", # "dbname": "softwareheritage", # "user": "guest" # } }, "provenance": {"cls": "local", "db": {"host": "localhost", "dbname": "provenance"}}, } CONFIG_FILE_HELP = f"""Configuration file: \b The CLI option or the environment variable will fail if invalid. CLI option is checked first. Then, environment variable {CONFIG_ENVVAR} is checked. Then, if cannot load the default path, a set of default values are used. Default config path is {DEFAULT_CONFIG_PATH}. Default config values are: \b {yaml.dump(DEFAULT_CONFIG)}""" PROVENANCE_HELP = f"""Software Heritage Scanner tools. {CONFIG_FILE_HELP}""" @swh_cli_group.group( name="provenance", context_settings=CONTEXT_SETTINGS, help=PROVENANCE_HELP ) @click.option( "-C", "--config-file", default=None, type=click.Path(exists=False, dir_okay=False, path_type=str), help="""YAML configuration file.""", ) @click.option( "-P", "--profile", default=None, type=click.Path(exists=False, dir_okay=False, path_type=str), help="""Enable profiling to specified file.""", ) @click.pass_context def cli(ctx, config_file: Optional[str], profile: str): if config_file is None and config.config_exists(DEFAULT_PATH): config_file = DEFAULT_PATH if config_file is None: conf = DEFAULT_CONFIG else: # read_raw_config do not fail on ENOENT if not config.config_exists(config_file): raise FileNotFoundError(config_file) conf = config.read_raw_config(config.config_basepath(config_file)) conf = config.merge_configs(DEFAULT_CONFIG, conf) ctx.ensure_object(dict) ctx.obj["config"] = conf if profile: import atexit import cProfile print("Profiling...") pr = cProfile.Profile() pr.enable() def exit(): pr.disable() pr.dump_stats(profile) atexit.register(exit) @cli.command(name="create", deprecated=True) @click.option("--maintenance-db", default=None) @click.option("--drop/--no-drop", "drop_db", default=False) @click.pass_context def create(ctx, maintenance_db, drop_db): """Deprecated, please use: swh db create provenance and swh db init provenance instead. """ @cli.command(name="iter-revisions") @click.argument("filename") @click.option("-l", "--limit", type=int) @click.pass_context def iter_revisions(ctx, filename, limit): # TODO: add file size filtering """Process a provided list of revisions.""" from . import get_archive, get_provenance from .provenance import revision_add from .revision import CSVRevisionIterator archive = get_archive(**ctx.obj["config"]["archive"]) provenance = get_provenance(**ctx.obj["config"]["provenance"]) revisions_provider = ( line.strip().split(",") for line in open(filename, "r") if line.strip() ) revisions = CSVRevisionIterator(revisions_provider, archive, limit=limit) for revision in revisions: revision_add(provenance, archive, revision) @cli.command(name="iter-origins") @click.argument("filename") @click.option("-l", "--limit", type=int) @click.pass_context def iter_origins(ctx, filename, limit): """Process a provided list of origins.""" from . import get_archive, get_provenance from .origin import FileOriginIterator from .provenance import origin_add archive = get_archive(**ctx.obj["config"]["archive"]) provenance = get_provenance(**ctx.obj["config"]["provenance"]) for origin in FileOriginIterator(filename, archive, limit=limit): origin_add(provenance, origin) @cli.command(name="find-first") @click.argument("swhid") @click.pass_context def find_first(ctx, swhid): """Find first occurrence of the requested blob.""" from . import get_provenance provenance = get_provenance(**ctx.obj["config"]["provenance"]) # TODO: return a dictionary with proper keys for each field row = provenance.content_find_first(hash_to_bytes(swhid)) if row is not None: print( - "{blob}, {rev}, {date}, {path}".format( - blob=hash_to_hex(row[0]), + "swh:1:cnt:{cnt}, swh:1:rev:{rev}, {date}, {path}".format( + cnt=hash_to_hex(row[0]), rev=hash_to_hex(row[1]), date=row[2], path=os.fsdecode(row[3]), ) ) else: print(f"Cannot find a content with the id {swhid}") @cli.command(name="find-all") @click.argument("swhid") +@click.option("-l", "--limit", type=int) @click.pass_context -def find_all(ctx, swhid): +def find_all(ctx, swhid, limit): """Find all occurrences of the requested blob.""" from swh.provenance import get_provenance provenance = get_provenance(**ctx.obj["config"]["provenance"]) # TODO: return a dictionary with proper keys for each field - for row in provenance.content_find_all(hash_to_bytes(swhid)): + for row in provenance.content_find_all(hash_to_bytes(swhid), limit=limit): print( - "{blob}, {rev}, {date}, {path}".format( - blob=hash_to_hex(row[0]), + "swh:1:cnt:{cnt}, swh:1:rev:{rev}, {date}, {path}".format( + cnt=hash_to_hex(row[0]), rev=hash_to_hex(row[1]), date=row[2], path=os.fsdecode(row[3]), ) ) diff --git a/swh/provenance/postgresql/provenancedb_with_path.py b/swh/provenance/postgresql/provenancedb_with_path.py index 49d3aaf..3b77b2b 100644 --- a/swh/provenance/postgresql/provenancedb_with_path.py +++ b/swh/provenance/postgresql/provenancedb_with_path.py @@ -1,192 +1,193 @@ from datetime import datetime import itertools import operator import os from typing import Generator, Optional, Tuple import psycopg2 import psycopg2.extras from ..model import DirectoryEntry, FileEntry from ..revision import RevisionEntry from .provenancedb_base import ProvenanceDBBase def normalize(path: bytes) -> bytes: return path[2:] if path.startswith(bytes("." + os.path.sep, "utf-8")) else path class ProvenanceWithPathDB(ProvenanceDBBase): def content_add_to_directory( self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes ): self.insert_cache["content_in_dir"].add( (blob.id, directory.id, normalize(os.path.join(prefix, blob.name))) ) def content_add_to_revision( self, revision: RevisionEntry, blob: FileEntry, prefix: bytes ): self.insert_cache["content_early_in_rev"].add( (blob.id, revision.id, normalize(os.path.join(prefix, blob.name))) ) def content_find_first( self, blobid: bytes ) -> Optional[Tuple[bytes, bytes, datetime, bytes]]: self.cursor.execute( """SELECT content_location.sha1 AS blob, revision.sha1 AS rev, revision.date AS date, content_location.path AS path FROM (SELECT content_hex.sha1, content_hex.rev, location.path FROM (SELECT content.sha1, content_early_in_rev.rev, content_early_in_rev.loc FROM content_early_in_rev JOIN content ON content.id=content_early_in_rev.blob WHERE content.sha1=%s ) AS content_hex JOIN location ON location.id=content_hex.loc ) AS content_location JOIN revision ON revision.id=content_location.rev ORDER BY date, rev, path ASC LIMIT 1""", (blobid,), ) return self.cursor.fetchone() def content_find_all( - self, blobid: bytes + self, blobid: bytes, limit: Optional[int] = None ) -> Generator[Tuple[bytes, bytes, datetime, bytes], None, None]: + early_cut = f"LIMIT {limit}" if limit is not None else "" self.cursor.execute( - """(SELECT content_location.sha1 AS blob, - revision.sha1 AS rev, - revision.date AS date, - content_location.path AS path - FROM (SELECT content_hex.sha1, - content_hex.rev, - location.path - FROM (SELECT content.sha1, - content_early_in_rev.rev, - content_early_in_rev.loc - FROM content_early_in_rev - JOIN content - ON content.id=content_early_in_rev.blob - WHERE content.sha1=%s - ) AS content_hex - JOIN location - ON location.id=content_hex.loc - ) AS content_location - JOIN revision - ON revision.id=content_location.rev - ) - UNION - (SELECT content_prefix.sha1 AS blob, - revision.sha1 AS rev, - revision.date AS date, - content_prefix.path AS path - FROM (SELECT content_in_rev.sha1, - content_in_rev.rev, - CASE location.path - WHEN '' THEN content_in_rev.suffix - WHEN '.' THEN content_in_rev.suffix - ELSE (location.path || '/' || - content_in_rev.suffix)::unix_path - END AS path - FROM (SELECT content_suffix.sha1, - directory_in_rev.rev, - directory_in_rev.loc, - content_suffix.path AS suffix - FROM (SELECT content_hex.sha1, - content_hex.dir, - location.path - FROM (SELECT content.sha1, - content_in_dir.dir, - content_in_dir.loc - FROM content_in_dir - JOIN content - ON content_in_dir.blob=content.id - WHERE content.sha1=%s - ) AS content_hex - JOIN location - ON location.id=content_hex.loc - ) AS content_suffix - JOIN directory_in_rev - ON directory_in_rev.dir=content_suffix.dir - ) AS content_in_rev - JOIN location - ON location.id=content_in_rev.loc - ) AS content_prefix - JOIN revision - ON revision.id=content_prefix.rev - ) - ORDER BY date, rev, path""", + f"""(SELECT content_location.sha1 AS blob, + revision.sha1 AS rev, + revision.date AS date, + content_location.path AS path + FROM (SELECT content_hex.sha1, + content_hex.rev, + location.path + FROM (SELECT content.sha1, + content_early_in_rev.rev, + content_early_in_rev.loc + FROM content_early_in_rev + JOIN content + ON content.id=content_early_in_rev.blob + WHERE content.sha1=%s + ) AS content_hex + JOIN location + ON location.id=content_hex.loc + ) AS content_location + JOIN revision + ON revision.id=content_location.rev + ) + UNION + (SELECT content_prefix.sha1 AS blob, + revision.sha1 AS rev, + revision.date AS date, + content_prefix.path AS path + FROM (SELECT content_in_rev.sha1, + content_in_rev.rev, + CASE location.path + WHEN '' THEN content_in_rev.suffix + WHEN '.' THEN content_in_rev.suffix + ELSE (location.path || '/' || + content_in_rev.suffix)::unix_path + END AS path + FROM (SELECT content_suffix.sha1, + directory_in_rev.rev, + directory_in_rev.loc, + content_suffix.path AS suffix + FROM (SELECT content_hex.sha1, + content_hex.dir, + location.path + FROM (SELECT content.sha1, + content_in_dir.dir, + content_in_dir.loc + FROM content_in_dir + JOIN content + ON content_in_dir.blob=content.id + WHERE content.sha1=%s + ) AS content_hex + JOIN location + ON location.id=content_hex.loc + ) AS content_suffix + JOIN directory_in_rev + ON directory_in_rev.dir=content_suffix.dir + ) AS content_in_rev + JOIN location + ON location.id=content_in_rev.loc + ) AS content_prefix + JOIN revision + ON revision.id=content_prefix.rev + ) + ORDER BY date, rev, path {early_cut}""", (blobid, blobid), ) # TODO: use POSTGRESQL EXPLAIN looking for query optimizations. yield from self.cursor.fetchall() def directory_add_to_revision( self, revision: RevisionEntry, directory: DirectoryEntry, path: bytes ): self.insert_cache["directory_in_rev"].add( (directory.id, revision.id, normalize(path)) ) def insert_location(self, src0_table, src1_table, dst_table): # Resolve src0 ids src0_values = dict().fromkeys( map(operator.itemgetter(0), self.insert_cache[dst_table]) ) values = ", ".join(itertools.repeat("%s", len(src0_values))) self.cursor.execute( f"""SELECT sha1, id FROM {src0_table} WHERE sha1 IN ({values})""", tuple(src0_values), ) src0_values = dict(self.cursor.fetchall()) # Resolve src1 ids src1_values = dict().fromkeys( map(operator.itemgetter(1), self.insert_cache[dst_table]) ) values = ", ".join(itertools.repeat("%s", len(src1_values))) self.cursor.execute( f"""SELECT sha1, id FROM {src1_table} WHERE sha1 IN ({values})""", tuple(src1_values), ) src1_values = dict(self.cursor.fetchall()) # Resolve location ids location = dict().fromkeys( map(operator.itemgetter(2), self.insert_cache[dst_table]) ) location = dict( psycopg2.extras.execute_values( self.cursor, """LOCK TABLE ONLY location; INSERT INTO location(path) VALUES %s ON CONFLICT (path) DO UPDATE SET path=EXCLUDED.path RETURNING path, id""", map(lambda path: (path,), location.keys()), fetch=True, ) ) # Insert values in dst_table rows = map( lambda row: (src0_values[row[0]], src1_values[row[1]], location[row[2]]), self.insert_cache[dst_table], ) psycopg2.extras.execute_values( self.cursor, f"""INSERT INTO {dst_table} VALUES %s ON CONFLICT DO NOTHING""", rows, ) self.insert_cache[dst_table].clear() diff --git a/swh/provenance/postgresql/provenancedb_without_path.py b/swh/provenance/postgresql/provenancedb_without_path.py index 1ec6962..732e8b3 100644 --- a/swh/provenance/postgresql/provenancedb_without_path.py +++ b/swh/provenance/postgresql/provenancedb_without_path.py @@ -1,132 +1,133 @@ from datetime import datetime import itertools import operator from typing import Generator, Optional, Tuple import psycopg2 import psycopg2.extras from ..model import DirectoryEntry, FileEntry from ..revision import RevisionEntry from .provenancedb_base import ProvenanceDBBase ######################################################################################## ######################################################################################## ######################################################################################## class ProvenanceWithoutPathDB(ProvenanceDBBase): def content_add_to_directory( self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes ): self.insert_cache["content_in_dir"].add((blob.id, directory.id)) def content_add_to_revision( self, revision: RevisionEntry, blob: FileEntry, prefix: bytes ): self.insert_cache["content_early_in_rev"].add((blob.id, revision.id)) def content_find_first( self, blobid: bytes ) -> Optional[Tuple[bytes, bytes, datetime, bytes]]: self.cursor.execute( """SELECT revision.sha1 AS rev, revision.date AS date FROM (SELECT content_early_in_rev.rev FROM content_early_in_rev JOIN content ON content.id=content_early_in_rev.blob WHERE content.sha1=%s ) AS content_in_rev JOIN revision ON revision.id=content_in_rev.rev ORDER BY date, rev ASC LIMIT 1""", (blobid,), ) row = self.cursor.fetchone() if row is not None: # TODO: query revision from the archive and look for blobid into a # recursive directory_ls of the revision's root. return blobid, row[0], row[1], b"" return None def content_find_all( - self, blobid: bytes + self, blobid: bytes, limit: Optional[int] = None ) -> Generator[Tuple[bytes, bytes, datetime, bytes], None, None]: + early_cut = f"LIMIT {limit}" if limit is not None else "" self.cursor.execute( - """(SELECT revision.sha1 AS rev, - revision.date AS date - FROM (SELECT content_early_in_rev.rev - FROM content_early_in_rev - JOIN content - ON content.id=content_early_in_rev.blob - WHERE content.sha1=%s - ) AS content_in_rev - JOIN revision - ON revision.id=content_in_rev.rev - ) - UNION - (SELECT revision.sha1 AS rev, - revision.date AS date - FROM (SELECT directory_in_rev.rev - FROM (SELECT content_in_dir.dir - FROM content_in_dir - JOIN content - ON content_in_dir.blob=content.id - WHERE content.sha1=%s - ) AS content_dir - JOIN directory_in_rev - ON directory_in_rev.dir=content_dir.dir - ) AS content_in_rev - JOIN revision - ON revision.id=content_in_rev.rev - ) - ORDER BY date, rev""", + f"""(SELECT revision.sha1 AS rev, + revision.date AS date + FROM (SELECT content_early_in_rev.rev + FROM content_early_in_rev + JOIN content + ON content.id=content_early_in_rev.blob + WHERE content.sha1=%s + ) AS content_in_rev + JOIN revision + ON revision.id=content_in_rev.rev + ) + UNION + (SELECT revision.sha1 AS rev, + revision.date AS date + FROM (SELECT directory_in_rev.rev + FROM (SELECT content_in_dir.dir + FROM content_in_dir + JOIN content + ON content_in_dir.blob=content.id + WHERE content.sha1=%s + ) AS content_dir + JOIN directory_in_rev + ON directory_in_rev.dir=content_dir.dir + ) AS content_in_rev + JOIN revision + ON revision.id=content_in_rev.rev + ) + ORDER BY date, rev {early_cut}""", (blobid, blobid), ) # TODO: use POSTGRESQL EXPLAIN looking for query optimizations. for row in self.cursor.fetchall(): # TODO: query revision from the archive and look for blobid into a # recursive directory_ls of the revision's root. yield blobid, row[0], row[1], b"" def directory_add_to_revision( self, revision: RevisionEntry, directory: DirectoryEntry, path: bytes ): self.insert_cache["directory_in_rev"].add((directory.id, revision.id)) def insert_location(self, src0_table, src1_table, dst_table): # Resolve src0 ids src0_values = dict().fromkeys( map(operator.itemgetter(0), self.insert_cache[dst_table]) ) values = ", ".join(itertools.repeat("%s", len(src0_values))) self.cursor.execute( f"""SELECT sha1, id FROM {src0_table} WHERE sha1 IN ({values})""", tuple(src0_values), ) src0_values = dict(self.cursor.fetchall()) # Resolve src1 ids src1_values = dict().fromkeys( map(operator.itemgetter(1), self.insert_cache[dst_table]) ) values = ", ".join(itertools.repeat("%s", len(src1_values))) self.cursor.execute( f"""SELECT sha1, id FROM {src1_table} WHERE sha1 IN ({values})""", tuple(src1_values), ) src1_values = dict(self.cursor.fetchall()) # Insert values in dst_table rows = map( lambda row: (src0_values[row[0]], src1_values[row[1]]), self.insert_cache[dst_table], ) psycopg2.extras.execute_values( self.cursor, f"""INSERT INTO {dst_table} VALUES %s ON CONFLICT DO NOTHING""", rows, ) self.insert_cache[dst_table].clear() diff --git a/swh/provenance/provenance.gr.txt b/swh/provenance/provenance.gr.txt deleted file mode 100644 index d8c345e..0000000 --- a/swh/provenance/provenance.gr.txt +++ /dev/null @@ -1,147 +0,0 @@ -def revision_process_content( - provenance: ProvenanceInterface, revision: RevisionEntry, directory: DirectoryEntry -): - date = provenance.directory_get_date_in_isochrone_frontier(directory) - # stack = [(directory, directory.name)] - - # XXX - # deal with revision root (if we still want to allow directory root in D-R table) - # should be managed in revision_add() with an "else" statment folowwing the "if" - # but I put it here because I need to be sure, that all directories in the outer - # frontier will be child of a directory (the root directory is a particular case) - if (date != None) and (date < revision.date): - # push directory root - provenance.directory_add_to_revision(revision, directory, directory) - stack = [] - else: - stack = [(directory, date, directory.name)] - - # used to store directory timestamp - TE = {} - - while stack: - dir, date, path = stack.pop() - - if dir.id in TE: - dateTE = TE[dir.id] - if (date == None) or (dateTE < date): - date = dateTE - - dirs = [child for child in iter(dir) if isinstance(child, DirectoryEntry)] - dirdates = provenance.directory_get_early_dates(dirs) - - nextstack = [] - - # XXX look for child directory with unkown timestamp - - for child in dirs: - dateDir = dirdates.get(child.id, None) - dateTE = TE.get(child.dir, None) - if (dateTE != None) and ((dateDir == None) or (dateTE < dateDir)): - dirdates[child.id] = dateTE - if dateDir is None: - # we gonna have to process the directory to know max(dates) - nextstack.append((child, None, path / child.name)) - TE[child.id] = None - elif dateDir > revision.date: - # directory seen earlier as part of the outer frontier - # need to be reset and manage as it was never seen before - nextstack.append((child, None, path / child.name)) - TE[child.id] = None - elif dateDir == revision.date: - # directory of the inner frontier - # nothing to do here - pass - elif dateDir < revision.date: - # directory of the outer frontier - # nothing to do here - pass - else: - # should not happen - print("ERROR") - - if nextstack: - # we have to proceed recursively - # we can't know max(dates) - stack.append((dir, date, path)) - stack += nextstack - # order in the stack is important ... - else: - # otherwise proceed to determine max(dates) - - # XXX we look for blob status - - blobs = [child for child in iter(dir) if isinstance(child, FileEntry)] - blobdates = provenance.content_get_early_dates(blobs) - for child in blobs: - dateBlob = blobdates.get(child.id, None) - if dateBlob > revision.date: - # content already found - # but this revision is earliest - blobdates[child.id] = None - - # calculate max(dates) - if (len(blobs) + len(dirs)) == 0: - # empty dir - # return revision.dates - TE[dir.id] = revision.date - # and we are done - else: - maxdates = revision.date - for bdate in blobdates.values(): - if date is not None: - maxdates = max(maxdates, bdate) - for ddate in dirdates.values(): - maxdates = max(maxdates, ddate) - # about the directory we are processing - if maxdates < revision.date: - # this directory is outside the ischrone graph - if stack: - # not the root directory - # all directories and blobs already known - TE[dir.id] = maxdates - else: - # this is the root directory - provenance.directory_add_to_revision(revision, dir, path) - if date == None: - # should the same as - # provenance.directory_get_early_date(dir)==None - directory_process_content( - provenance, - directory=dir, - relative=dir, - prefix=PosixPath("."), - ) - if date == None or date > maxdates: - provenance.directory_set_date_in_isochrone_frontier( - dir, maxdates - ) # ! make sure insert makes a min - - elif maxdates == revision.date: - # the current directory is in the inner isochrone frontier - # that s where we can see directory nodes of the outer frontier - for child in blobs: - dateBlob = blobdates.get(child.id) - if dateBlob is None: - # unkown or reset - provenance.content_set_early_date(child, revision.date) - # ! make sure it makes a min inserting it, - # if it already exists - provenance.content_add_to_revision(revision, child, path) - for child in dirs: - dateDir = dirdates.get(child.id) - if dateDir < revision.date: - # this child directory is in the outer frontier - provenance.directory_add_to_revision(revision, child, path) - if provenance.directory_get_early_date(child) == None: - # with this implementation you don't known if this is a - # new one or a reset - directory_process_content( - provenance, - directory=child, - relative=child, - prefix=PosixPath("."), - ) - provenance.directory_set_date_in_isochrone_frontier( - child, cdate - ) # ! make sure insert = min diff --git a/swh/provenance/provenance.py b/swh/provenance/provenance.py index 8427950..f48385f 100644 --- a/swh/provenance/provenance.py +++ b/swh/provenance/provenance.py @@ -1,373 +1,390 @@ from datetime import datetime import os +import pytz from typing import Dict, Generator, List, Optional, Tuple from typing_extensions import Protocol, runtime_checkable from .archive import ArchiveInterface from .model import DirectoryEntry, FileEntry, TreeEntry from .origin import OriginEntry from .revision import RevisionEntry -# TODO: consider moving to path utils file together with normalize. -def is_child(path: bytes, prefix: bytes) -> bool: - return path != prefix and os.path.dirname(path) == prefix - - @runtime_checkable class ProvenanceInterface(Protocol): def commit(self): """Commit currently ongoing transactions in the backend DB""" ... def content_add_to_directory( self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes ) -> None: ... def content_add_to_revision( self, revision: RevisionEntry, blob: FileEntry, prefix: bytes ) -> None: ... def content_find_first( self, blobid: bytes ) -> Optional[Tuple[bytes, bytes, datetime, bytes]]: ... def content_find_all( - self, blobid: bytes + self, blobid: bytes, limit: Optional[int] = None ) -> Generator[Tuple[bytes, bytes, datetime, bytes], None, None]: ... def content_get_early_date(self, blob: FileEntry) -> Optional[datetime]: ... def content_get_early_dates(self, blobs: List[FileEntry]) -> Dict[bytes, datetime]: ... def content_set_early_date(self, blob: FileEntry, date: datetime) -> None: ... def directory_add_to_revision( self, revision: RevisionEntry, directory: DirectoryEntry, path: bytes ) -> None: ... def directory_get_date_in_isochrone_frontier( self, directory: DirectoryEntry ) -> Optional[datetime]: ... def directory_get_dates_in_isochrone_frontier( self, dirs: List[DirectoryEntry] ) -> Dict[bytes, datetime]: ... def directory_invalidate_in_isochrone_frontier( self, directory: DirectoryEntry ) -> None: ... def directory_set_date_in_isochrone_frontier( self, directory: DirectoryEntry, date: datetime ) -> None: ... def origin_get_id(self, origin: OriginEntry) -> int: ... def revision_add(self, revision: RevisionEntry) -> None: ... def revision_add_before_revision( self, relative: RevisionEntry, revision: RevisionEntry ) -> None: ... def revision_add_to_origin( self, origin: OriginEntry, revision: RevisionEntry ) -> None: ... def revision_get_early_date(self, revision: RevisionEntry) -> Optional[datetime]: ... def revision_get_preferred_origin(self, revision: RevisionEntry) -> int: ... def revision_in_history(self, revision: RevisionEntry) -> bool: ... def revision_set_preferred_origin( self, origin: OriginEntry, revision: RevisionEntry ) -> None: ... def revision_visited(self, revision: RevisionEntry) -> bool: ... def directory_process_content( provenance: ProvenanceInterface, directory: DirectoryEntry, relative: DirectoryEntry ) -> None: stack = [(directory, b"")] while stack: current, prefix = stack.pop() for child in iter(current): if isinstance(child, FileEntry): # Add content to the relative directory with the computed prefix. provenance.content_add_to_directory(relative, child, prefix) else: # Recursively walk the child directory. stack.append((child, os.path.join(prefix, child.name))) def origin_add(provenance: ProvenanceInterface, origin: OriginEntry) -> None: # TODO: refactor to iterate over origin visit statuses and commit only once # per status. origin.id = provenance.origin_get_id(origin) for revision in origin.revisions: origin_add_revision(provenance, origin, revision) # Commit after each revision provenance.commit() # TODO: verify this! def origin_add_revision( provenance: ProvenanceInterface, origin: OriginEntry, revision: RevisionEntry ) -> None: stack: List[Tuple[Optional[RevisionEntry], RevisionEntry]] = [(None, revision)] while stack: relative, current = stack.pop() # Check if current revision has no preferred origin and update if necessary. preferred = provenance.revision_get_preferred_origin(current) if preferred is None: provenance.revision_set_preferred_origin(origin, current) ######################################################################## if relative is None: # This revision is pointed directly by the origin. visited = provenance.revision_visited(current) provenance.revision_add_to_origin(origin, current) if not visited: stack.append((current, current)) else: # This revision is a parent of another one in the history of the # relative revision. for parent in iter(current): visited = provenance.revision_visited(parent) if not visited: # The parent revision has never been seen before pointing # directly to an origin. known = provenance.revision_in_history(parent) if known: # The parent revision is already known in some other # revision's history. We should point it directly to # the origin and (eventually) walk its history. stack.append((None, parent)) else: # The parent revision was never seen before. We should # walk its history and associate it with the same # relative revision. provenance.revision_add_before_revision(relative, parent) stack.append((relative, parent)) else: # The parent revision already points to an origin, so its # history was properly processed before. We just need to # make sure it points to the current origin as well. provenance.revision_add_to_origin(origin, parent) def revision_add( provenance: ProvenanceInterface, archive: ArchiveInterface, revision: RevisionEntry ) -> None: assert revision.date is not None assert revision.root is not None # Processed content starting from the revision's root directory. date = provenance.revision_get_early_date(revision) if date is None or revision.date < date: provenance.revision_add(revision) # TODO: add file size filtering revision_process_content( provenance, revision, DirectoryEntry(archive, revision.root, b"") ) # TODO: improve this! Maybe using a max attempt counter? # Ideally Provenance class should guarantee that a commit never fails. while not provenance.commit(): continue class IsochroneNode: def __init__(self, entry: TreeEntry, dates: Dict[bytes, datetime] = {}): self.entry = entry self.date = dates.get(self.entry.id, None) self.children: List[IsochroneNode] = [] self.maxdate: Optional[datetime] = None def add_child( self, child: TreeEntry, dates: Dict[bytes, datetime] = {} ) -> "IsochroneNode": assert isinstance(self.entry, DirectoryEntry) and self.date is None node = IsochroneNode(child, dates=dates) self.children.append(node) return node def build_isochrone_graph( provenance: ProvenanceInterface, revision: RevisionEntry, directory: DirectoryEntry ) -> IsochroneNode: assert revision.date is not None # Build the nodes structure root = IsochroneNode(directory) root.date = provenance.directory_get_date_in_isochrone_frontier(directory) stack = [root] while stack: current = stack.pop() assert isinstance(current.entry, DirectoryEntry) if current.date is None or current.date >= revision.date: # If current directory has an associated date in the isochrone frontier that # is greater or equal to the current revision's one, it should be ignored as # the revision is being processed out of order. if current.date is not None and current.date >= revision.date: provenance.directory_invalidate_in_isochrone_frontier(current.entry) current.date = None # Pre-query all known dates for content/directories in the current directory # for the provenance object to have them cached and (potentially) improve # performance. ddates = provenance.directory_get_dates_in_isochrone_frontier( [child for child in current.entry if isinstance(child, DirectoryEntry)] ) fdates = provenance.content_get_early_dates( [child for child in current.entry if isinstance(child, FileEntry)] ) for child in current.entry: # Recursively analyse directory nodes. if isinstance(child, DirectoryEntry): node = current.add_child(child, dates=ddates) stack.append(node) else: current.add_child(child, dates=fdates) # Precalculate max known date for each node in the graph. stack = [root] while stack: current = stack.pop() if current.date is None: if any(map(lambda child: child.maxdate is None, current.children)): # Current node needs to be analysed again after its children. stack.append(current) for child in current.children: if isinstance(child.entry, FileEntry): if child.date is not None: # File node that has been seen before, just use its known # date. child.maxdate = child.date else: # File node that has never been seen before, use current # revision date. child.maxdate = revision.date else: # Recursively analyse directory nodes. stack.append(child) else: maxdates = [] for child in current.children: assert child.maxdate is not None maxdates.append(child.maxdate) - current.maxdate = max(maxdates) if maxdates else datetime.min + current.maxdate = ( + max(maxdates) if maxdates else datetime.min.replace(tzinfo=pytz.UTC) + ) else: # Directory node in the frontier, just use its known date. current.maxdate = current.date return root def revision_process_content( provenance: ProvenanceInterface, revision: RevisionEntry, root: DirectoryEntry ): assert revision.date is not None stack = [(build_isochrone_graph(provenance, revision, root), root.name)] while stack: current, path = stack.pop() assert isinstance(current.entry, DirectoryEntry) if current.date is not None: assert current.date < revision.date # Current directory is an outer isochrone frontier for a previously # processed revision. It should be reused as is. provenance.directory_add_to_revision(revision, current.entry, path) else: # Current directory is not an outer isochrone frontier for any previous # revision. It might be eligible for this one. if is_new_frontier(current, revision): assert current.maxdate is not None # Outer frontier should be moved to current position in the isochrone # graph. This is the first time this directory is found in the isochrone # frontier. provenance.directory_set_date_in_isochrone_frontier( current.entry, current.maxdate ) provenance.directory_add_to_revision(revision, current.entry, path) directory_process_content( provenance, directory=current.entry, relative=current.entry, ) else: # No point moving the frontier here. Either there are no files or they # are being seen for the first time here. Add all blobs to current # revision updating date if necessary, and recursively analyse # subdirectories as candidates to the outer frontier. for child in current.children: if isinstance(child.entry, FileEntry): blob = child.entry if child.date is None or revision.date < child.date: provenance.content_set_early_date(blob, revision.date) provenance.content_add_to_revision(revision, blob, path) else: stack.append((child, os.path.join(path, child.entry.name))) def is_new_frontier(node: IsochroneNode, revision: RevisionEntry) -> bool: assert node.maxdate is not None and revision.date is not None # Using the following condition should we should get an algorithm equivalent to old # version where frontiers are pushed up in the tree whenever possible. - return node.maxdate < revision.date - # return node.maxdate < revision.date and has_blobs(node) + # return node.maxdate < revision.date # all content in node is already known + + # Push frontiers up while forbidding them in the root directory of the revision. + # return ( + # node.maxdate < revision.date # all content in node is already known + # and node.entry.id != revision.root # it is not the root directory + # ) + + # Keep frontiers down in the directory tree with the aim of maximizing their + # reusage. + # return ( + # node.maxdate < revision.date # all content in node is already known + # and has_blobs(node) # there is at least one blob in it + # ) + + # Keep frontiers down and also forbid placing them in the root directory. + return ( + node.maxdate < revision.date # all content in node is already known + and node.entry.id != revision.root # it is not the root directory + and has_blobs(node) # there is at least one blob in it + ) def has_blobs(node: IsochroneNode) -> bool: # We may want to look for files in different ways to decide whether to define a # frontier or not: # 1. Only files in current node: return any(map(lambda child: isinstance(child.entry, FileEntry), node.children)) # 2. Files anywhere in the isochrone graph # stack = [node] # while stack: # current = stack.pop() # if any( # map(lambda child: isinstance(child.entry, FileEntry), current.children)): # return True # else: # # All children are directory entries. # stack.extend(current.children) # return False # 3. Files in the intermediate directories between current node and any previously # defined frontier: - # TODO: complete this! + # TODO: complete this case! # return any( # map(lambda child: isinstance(child.entry, FileEntry), node.children) # ) or all( # map( # lambda child: ( # not (isinstance(child.entry, DirectoryEntry) and child.date is None) # ) # or has_blobs(child), # node.children, # ) # ) diff --git a/swh/provenance/tests/test_cli.py b/swh/provenance/tests/test_cli.py index e0f332b..93046b5 100644 --- a/swh/provenance/tests/test_cli.py +++ b/swh/provenance/tests/test_cli.py @@ -1,111 +1,114 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from click.testing import CliRunner import psycopg2 import pytest import yaml from swh.core.cli import swh as swhmain import swh.core.cli.db # noqa ; ensure cli is loaded import swh.provenance.cli # noqa ; ensure cli is loaded def test_cli_swh_db_help(): # swhmain.add_command(provenance_cli) result = CliRunner().invoke(swhmain, ["provenance", "-h"]) assert result.exit_code == 0 assert "Commands:" in result.output commands = result.output.split("Commands:")[1] for command in ( "create", "find-all", "find-first", "iter-origins", "iter-revisions", ): assert f" {command} " in commands def test_cli_create_deprecated(provenance_db, tmp_path): conffile = tmp_path / "config.yml" conf = { - "provenance": {"cls": "local", "with_path": True,}, + "provenance": { + "cls": "local", + "with_path": True, + }, } yaml.dump(conf, conffile.open("w")) result = CliRunner().invoke( swhmain, ["provenance", "--config-file", str(conffile), "create", "--drop"] ) assert result.exit_code == 0, result.output assert "DeprecationWarning" in result.output TABLES = { "dbflavor", "dbversion", "content", "content_early_in_rev", "content_in_dir", "directory", "directory_in_rev", "origin", "revision", "revision_before_rev", "revision_in_org", } @pytest.mark.parametrize( "flavor, dbtables", (("with-path", TABLES | {"location"}), ("without-path", TABLES)) ) def test_cli_db_create_and_init_db_with_flavor( monkeypatch, postgresql, flavor, dbtables ): """Test that 'swh db init provenance' works with flavors for both with-path and without-path flavors""" dbname = f"{flavor}-db" # DB creation using 'swh db create' db_params = postgresql.get_dsn_parameters() monkeypatch.setenv("PGHOST", db_params["host"]) monkeypatch.setenv("PGUSER", db_params["user"]) monkeypatch.setenv("PGPORT", db_params["port"]) result = CliRunner().invoke(swhmain, ["db", "create", "-d", dbname, "provenance"]) assert result.exit_code == 0, result.output # DB init using 'swh db init' result = CliRunner().invoke( swhmain, ["db", "init", "-d", dbname, "--flavor", flavor, "provenance"] ) assert result.exit_code == 0, result.output assert f"(flavor {flavor})" in result.output db_params["dbname"] = dbname cnx = psycopg2.connect(**db_params) # check the DB looks OK (check for db_flavor and expected tables) with cnx.cursor() as cur: cur.execute("select swh_get_dbflavor()") assert cur.fetchone() == (flavor,) cur.execute( "select table_name from information_schema.tables " "where table_schema = 'public' " f"and table_catalog = '{dbname}'" ) tables = set(x for (x,) in cur.fetchall()) assert tables == dbtables def test_cli_init_db_default_flavor(provenance_db): "Test that 'swh db init provenance' defaults to a with-path flavored DB" dbname = provenance_db.dsn result = CliRunner().invoke(swhmain, ["db", "init", "-d", dbname, "provenance"]) assert result.exit_code == 0, result.output with provenance_db.cursor() as cur: cur.execute("select swh_get_dbflavor()") assert cur.fetchone() == ("with-path",)