diff --git a/swh/provenance/cli.py b/swh/provenance/cli.py index e8850f0..989d1d8 100644 --- a/swh/provenance/cli.py +++ b/swh/provenance/cli.py @@ -1,200 +1,198 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information # WARNING: do not import unnecessary things here to keep cli startup time under # control import os from typing import Any, Dict, Optional import click import yaml from swh.core import config from swh.core.cli import CONTEXT_SETTINGS from swh.core.cli import swh as swh_cli_group from swh.model.hashutil import hash_to_bytes, hash_to_hex # All generic config code should reside in swh.core.config CONFIG_ENVVAR = "SWH_CONFIG_FILE" DEFAULT_CONFIG_PATH = os.path.join(click.get_app_dir("swh"), "global.yml") DEFAULT_PATH = os.environ.get(CONFIG_ENVVAR, DEFAULT_CONFIG_PATH) DEFAULT_CONFIG: Dict[str, Any] = { "archive": { "cls": "api", "storage": { "cls": "remote", "url": "http://uffizi.internal.softwareheritage.org:5002", } # "cls": "direct", # "db": { # "host": "db.internal.softwareheritage.org", # "dbname": "softwareheritage", # "user": "guest" # } }, "provenance": {"cls": "local", "db": {"host": "localhost", "dbname": "provenance"}}, } CONFIG_FILE_HELP = f"""Configuration file: \b The CLI option or the environment variable will fail if invalid. CLI option is checked first. Then, environment variable {CONFIG_ENVVAR} is checked. Then, if cannot load the default path, a set of default values are used. Default config path is {DEFAULT_CONFIG_PATH}. Default config values are: \b {yaml.dump(DEFAULT_CONFIG)}""" PROVENANCE_HELP = f"""Software Heritage Scanner tools. {CONFIG_FILE_HELP}""" @swh_cli_group.group( name="provenance", context_settings=CONTEXT_SETTINGS, help=PROVENANCE_HELP ) @click.option( "-C", "--config-file", default=None, type=click.Path(exists=False, dir_okay=False, path_type=str), help="""YAML configuration file.""", ) @click.option( "-P", "--profile", default=None, type=click.Path(exists=False, dir_okay=False, path_type=str), help="""Enable profiling to specified file.""", ) @click.pass_context def cli(ctx, config_file: Optional[str], profile: str): if config_file is None and config.config_exists(DEFAULT_PATH): config_file = DEFAULT_PATH if config_file is None: conf = DEFAULT_CONFIG else: # read_raw_config do not fail on ENOENT if not config.config_exists(config_file): raise FileNotFoundError(config_file) conf = config.read_raw_config(config.config_basepath(config_file)) conf = config.merge_configs(DEFAULT_CONFIG, conf) ctx.ensure_object(dict) ctx.obj["config"] = conf if profile: import atexit import cProfile print("Profiling...") pr = cProfile.Profile() pr.enable() def exit(): pr.disable() pr.dump_stats(profile) atexit.register(exit) @cli.command(name="iter-revisions") @click.argument("filename") @click.option("-a", "--track-all", default=True, type=bool) @click.option("-l", "--limit", type=int) @click.option("-m", "--min-depth", default=1, type=int) @click.option("-r", "--reuse", default=True, type=bool) @click.pass_context def iter_revisions(ctx, filename, track_all, limit, min_depth, reuse): # TODO: add file size filtering """Process a provided list of revisions.""" from . import get_archive, get_provenance from .revision import CSVRevisionIterator, revision_add archive = get_archive(**ctx.obj["config"]["archive"]) provenance = get_provenance(**ctx.obj["config"]["provenance"]) revisions_provider = ( line.strip().split(",") for line in open(filename, "r") if line.strip() ) revisions = CSVRevisionIterator(revisions_provider, limit=limit) for revision in revisions: revision_add( provenance, archive, [revision], trackall=track_all, lower=reuse, mindepth=min_depth, ) @cli.command(name="iter-origins") @click.argument("filename") @click.option("-l", "--limit", type=int) @click.pass_context def iter_origins(ctx, filename, limit): """Process a provided list of origins.""" from . import get_archive, get_provenance from .origin import CSVOriginIterator, origin_add archive = get_archive(**ctx.obj["config"]["archive"]) provenance = get_provenance(**ctx.obj["config"]["provenance"]) origins_provider = ( line.strip().split(",") for line in open(filename, "r") if line.strip() ) origins = CSVOriginIterator(origins_provider, limit=limit) for origin in origins: origin_add(provenance, archive, [origin]) @cli.command(name="find-first") @click.argument("swhid") @click.pass_context def find_first(ctx, swhid): """Find first occurrence of the requested blob.""" from . import get_provenance provenance = get_provenance(**ctx.obj["config"]["provenance"]) # TODO: return a dictionary with proper keys for each field - row = provenance.content_find_first(hash_to_bytes(swhid)) - if row is not None: + occur = provenance.content_find_first(hash_to_bytes(swhid)) + if occur is not None: print( - "swh:1:cnt:{cnt}, swh:1:rev:{rev}, {date}, {path}".format( - cnt=hash_to_hex(row[0]), - rev=hash_to_hex(row[1]), - date=row[2], - path=os.fsdecode(row[3]), - ) + f"swh:1:cnt:{hash_to_hex(occur.content)}, " + f"swh:1:rev:{hash_to_hex(occur.revision)}, " + f"{occur.date}, " + f"{occur.origin}, " + f"{os.fsdecode(occur.path)}" ) else: print(f"Cannot find a content with the id {swhid}") @cli.command(name="find-all") @click.argument("swhid") @click.option("-l", "--limit", type=int) @click.pass_context def find_all(ctx, swhid, limit): """Find all occurrences of the requested blob.""" from . import get_provenance provenance = get_provenance(**ctx.obj["config"]["provenance"]) # TODO: return a dictionary with proper keys for each field - for row in provenance.content_find_all(hash_to_bytes(swhid), limit=limit): + for occur in provenance.content_find_all(hash_to_bytes(swhid), limit=limit): print( - "swh:1:cnt:{cnt}, swh:1:rev:{rev}, {date}, {path}".format( - cnt=hash_to_hex(row[0]), - rev=hash_to_hex(row[1]), - date=row[2], - path=os.fsdecode(row[3]), - ) + f"swh:1:cnt:{hash_to_hex(occur.content)}, " + f"swh:1:rev:{hash_to_hex(occur.revision)}, " + f"{occur.date}, " + f"{occur.origin}, " + f"{os.fsdecode(occur.path)}" ) diff --git a/swh/provenance/origin.py b/swh/provenance/origin.py index a27508e..72996bf 100644 --- a/swh/provenance/origin.py +++ b/swh/provenance/origin.py @@ -1,101 +1,101 @@ from itertools import islice import logging import time from typing import Iterable, Iterator, List, Optional, Tuple from swh.model.model import Sha1Git from .archive import ArchiveInterface from .graph import HistoryNode, build_history_graph from .model import OriginEntry, RevisionEntry from .provenance import ProvenanceInterface class CSVOriginIterator: """Iterator over origin visit statuses typically present in the given CSV file. The input is an iterator that produces 2 elements per row: (url, snap) where: - url: is the origin url of the visit - snap: sha1_git of the snapshot pointed by the visit status """ def __init__( self, statuses: Iterable[Tuple[str, Sha1Git]], limit: Optional[int] = None, ): self.statuses: Iterator[Tuple[str, Sha1Git]] if limit is not None: self.statuses = islice(statuses, limit) else: self.statuses = iter(statuses) def __iter__(self): return (OriginEntry(url, snapshot) for url, snapshot in self.statuses) def origin_add( provenance: ProvenanceInterface, archive: ArchiveInterface, origins: List[OriginEntry], ): start = time.time() for origin in origins: provenance.origin_add(origin) origin.retrieve_revisions(archive) for revision in origin.revisions: graph = build_history_graph(archive, provenance, revision) origin_add_revision(provenance, origin, graph) done = time.time() - provenance.commit() + provenance.flush() stop = time.time() logging.debug( "Origins " ";".join([origin.id.hex() + ":" + origin.snapshot.hex() for origin in origins]) + f" were processed in {stop - start} secs (commit took {stop - done} secs)!" ) def origin_add_revision( provenance: ProvenanceInterface, origin: OriginEntry, graph: HistoryNode, ): # head is treated separately since it should always be added to the given origin head = graph.entry check_preferred_origin(provenance, origin, head) provenance.revision_add_to_origin(origin, head) # head's history should be recursively iterated starting from its parents stack = list(graph.parents) while stack: current = stack.pop() check_preferred_origin(provenance, origin, current.entry) if current.visited: # if current revision was already visited just add it to the current origin # and stop recursion (its history has already been flattened) provenance.revision_add_to_origin(origin, current.entry) else: # if current revision was not visited before create a link between it and # the head, and recursively walk its history provenance.revision_add_before_revision(head, current.entry) for parent in current.parents: stack.append(parent) def check_preferred_origin( provenance: ProvenanceInterface, origin: OriginEntry, revision: RevisionEntry, ): # if the revision has no preferred origin just set the given origin as the # preferred one. TODO: this should be improved in the future! preferred = provenance.revision_get_preferred_origin(revision) if preferred is None: provenance.revision_set_preferred_origin(origin, revision) diff --git a/swh/provenance/postgresql/provenancedb_base.py b/swh/provenance/postgresql/provenancedb_base.py index 01fd12a..daaf1c5 100644 --- a/swh/provenance/postgresql/provenancedb_base.py +++ b/swh/provenance/postgresql/provenancedb_base.py @@ -1,263 +1,263 @@ from datetime import datetime import itertools import logging -from typing import Any, Dict, Generator, List, Optional, Set, Tuple +from typing import Any, Dict, Generator, List, Mapping, Optional, Set, Tuple import psycopg2 import psycopg2.extras from swh.model.model import Sha1Git +from ..provenance import ProvenanceResult + class ProvenanceDBBase: def __init__(self, conn: psycopg2.extensions.connection): conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) conn.set_session(autocommit=True) self.conn = conn self.cursor = self.conn.cursor() # XXX: not sure this is the best place to do it! self.cursor.execute("SET timezone TO 'UTC'") self._flavor: Optional[str] = None @property def flavor(self) -> str: if self._flavor is None: self.cursor.execute("select swh_get_dbflavor()") self._flavor = self.cursor.fetchone()[0] assert self._flavor is not None return self._flavor @property def with_path(self) -> bool: return self.flavor == "with-path" - def commit(self, data: Dict[str, Any], raise_on_commit: bool = False) -> bool: + def commit(self, data: Mapping[str, Any], raise_on_commit: bool = False) -> bool: try: # First insert entities for entity in ("content", "directory", "revision"): self.insert_entity( entity, { sha1: data[entity]["data"][sha1] for sha1 in data[entity]["added"] }, ) data[entity]["data"].clear() data[entity]["added"].clear() # Relations should come after ids for entities were resolved for relation in ( "content_in_revision", "content_in_directory", "directory_in_revision", ): self.insert_relation(relation, data[relation]) # Insert origins self.insert_origin( { sha1: data["origin"]["data"][sha1] for sha1 in data["origin"]["added"] }, ) data["origin"]["data"].clear() data["origin"]["added"].clear() # Insert relations from the origin-revision layer self.insert_revision_history(data["revision_before_revision"]) self.insert_origin_head(data["revision_in_origin"]) # Update preferred origins self.update_preferred_origin( { sha1: data["revision_origin"]["data"][sha1] for sha1 in data["revision_origin"]["added"] } ) data["revision_origin"]["data"].clear() data["revision_origin"]["added"].clear() return True except: # noqa: E722 # Unexpected error occurred, rollback all changes and log message logging.exception("Unexpected error") if raise_on_commit: raise return False - def content_find_first( - self, id: Sha1Git - ) -> Optional[Tuple[Sha1Git, Sha1Git, datetime, bytes]]: + def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]: ... def content_find_all( self, id: Sha1Git, limit: Optional[int] = None - ) -> Generator[Tuple[Sha1Git, Sha1Git, datetime, bytes], None, None]: + ) -> Generator[ProvenanceResult, None, None]: ... def get_dates(self, entity: str, ids: List[Sha1Git]) -> Dict[Sha1Git, datetime]: dates = {} if ids: values = ", ".join(itertools.repeat("%s", len(ids))) self.cursor.execute( f"""SELECT sha1, date FROM {entity} WHERE sha1 IN ({values})""", tuple(ids), ) dates.update(self.cursor.fetchall()) return dates def insert_entity(self, entity: str, data: Dict[Sha1Git, datetime]): if data: psycopg2.extras.execute_values( self.cursor, f""" LOCK TABLE ONLY {entity}; INSERT INTO {entity}(sha1, date) VALUES %s ON CONFLICT (sha1) DO UPDATE SET date=LEAST(EXCLUDED.date,{entity}.date) """, data.items(), ) # XXX: not sure if Python takes a reference or a copy. # This might be useless! data.clear() def insert_origin(self, data: Dict[Sha1Git, str]): if data: psycopg2.extras.execute_values( self.cursor, """ LOCK TABLE ONLY origin; INSERT INTO origin(sha1, url) VALUES %s ON CONFLICT DO NOTHING """, data.items(), ) # XXX: not sure if Python takes a reference or a copy. # This might be useless! data.clear() def insert_origin_head(self, data: Set[Tuple[Sha1Git, Sha1Git]]): if data: # Insert revisions first, to ensure "foreign keys" exist # Origins are assumed to be already inserted (they require knowing the url) psycopg2.extras.execute_values( self.cursor, """ LOCK TABLE ONLY revision; INSERT INTO revision(sha1) VALUES %s ON CONFLICT DO NOTHING """, {(rev,) for rev, _ in data}, ) psycopg2.extras.execute_values( self.cursor, # XXX: not clear how conflicts are handled here! """ LOCK TABLE ONLY revision_in_origin; INSERT INTO revision_in_origin SELECT R.id, O.id FROM (VALUES %s) AS V(rev, org) INNER JOIN revision AS R on (R.sha1=V.rev) INNER JOIN origin AS O on (O.sha1=V.org) ON CONFLICT DO NOTHING """, data, ) data.clear() def insert_relation(self, relation: str, data: Set[Tuple[Sha1Git, Sha1Git, bytes]]): ... def insert_revision_history(self, data: Dict[Sha1Git, Set[Sha1Git]]): if data: # print(f"Inserting histories: {data}") # Insert revisions first, to ensure "foreign keys" exist revisions = set(data) for rev in data: revisions.update(data[rev]) psycopg2.extras.execute_values( self.cursor, """ LOCK TABLE ONLY revision; INSERT INTO revision(sha1) VALUES %s ON CONFLICT DO NOTHING """, ((rev,) for rev in revisions), ) values = [[(prev, next) for next in data[prev]] for prev in data] psycopg2.extras.execute_values( self.cursor, # XXX: not clear how conflicts are handled here! """ LOCK TABLE ONLY revision_before_revision; INSERT INTO revision_before_revision SELECT P.id, N.id FROM (VALUES %s) AS V(prev, next) INNER JOIN revision AS P on (P.sha1=V.prev) INNER JOIN revision AS N on (N.sha1=V.next) ON CONFLICT DO NOTHING """, sum(values, []), ) data.clear() def revision_get_preferred_origin(self, revision: Sha1Git) -> Optional[Sha1Git]: self.cursor.execute( """ SELECT O.sha1 FROM revision AS R JOIN origin as O ON R.origin=O.id WHERE R.sha1=%s""", (revision,), ) row = self.cursor.fetchone() return row[0] if row is not None else None def revision_in_history(self, revision: Sha1Git) -> bool: self.cursor.execute( """ SELECT 1 FROM revision_before_revision JOIN revision ON revision.id=revision_before_revision.prev WHERE revision.sha1=%s """, (revision,), ) return self.cursor.fetchone() is not None def revision_visited(self, revision: Sha1Git) -> bool: self.cursor.execute( """ SELECT 1 FROM revision_in_origin JOIN revision ON revision.id=revision_in_origin.revision WHERE revision.sha1=%s """, (revision,), ) return self.cursor.fetchone() is not None def update_preferred_origin(self, data: Dict[Sha1Git, Sha1Git]): if data: # XXX: this is assuming the revision already exists in the db! It should # be improved by allowing null dates in the revision table. psycopg2.extras.execute_values( self.cursor, """ UPDATE revision R SET origin=O.id FROM (VALUES %s) AS V(rev, org) INNER JOIN origin AS O on (O.sha1=V.org) WHERE R.sha1=V.rev """, data.items(), ) data.clear() diff --git a/swh/provenance/postgresql/provenancedb_with_path.py b/swh/provenance/postgresql/provenancedb_with_path.py index 62c6f87..287f528 100644 --- a/swh/provenance/postgresql/provenancedb_with_path.py +++ b/swh/provenance/postgresql/provenancedb_with_path.py @@ -1,107 +1,120 @@ -from datetime import datetime from typing import Generator, Optional, Set, Tuple import psycopg2 import psycopg2.extras from swh.model.model import Sha1Git +from ..provenance import ProvenanceResult from .provenancedb_base import ProvenanceDBBase class ProvenanceWithPathDB(ProvenanceDBBase): - def content_find_first( - self, id: Sha1Git - ) -> Optional[Tuple[Sha1Git, Sha1Git, datetime, bytes]]: + def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]: self.cursor.execute( """ SELECT C.sha1 AS blob, R.sha1 AS rev, R.date AS date, + O.url AS url, L.path AS path FROM content AS C - INNER JOIN content_in_revision AS CR ON (CR.content = C.id) - INNER JOIN location as L ON (CR.location = L.id) - INNER JOIN revision as R ON (CR.revision = R.id) + INNER JOIN content_in_revision AS CR ON (CR.content=C.id) + INNER JOIN location as L ON (CR.location=L.id) + INNER JOIN revision as R ON (CR.revision=R.id) + LEFT JOIN origin as O ON (R.origin=O.id) WHERE C.sha1=%s - ORDER BY date, rev, path ASC LIMIT 1 + ORDER BY date, rev, url, path ASC LIMIT 1 """, (id,), ) - return self.cursor.fetchone() + row = self.cursor.fetchone() + if row: + return ProvenanceResult( + content=row[0], revision=row[1], date=row[2], origin=row[3], path=row[4] + ) + else: + return None def content_find_all( self, id: Sha1Git, limit: Optional[int] = None - ) -> Generator[Tuple[Sha1Git, Sha1Git, datetime, bytes], None, None]: + ) -> Generator[ProvenanceResult, None, None]: early_cut = f"LIMIT {limit}" if limit is not None else "" self.cursor.execute( f""" (SELECT C.sha1 AS blob, R.sha1 AS rev, R.date AS date, + O.url AS url, L.path AS path FROM content AS C - INNER JOIN content_in_revision AS CR ON (CR.content = C.id) - INNER JOIN location AS L ON (CR.location = L.id) - INNER JOIN revision AS R ON (CR.revision = R.id) + INNER JOIN content_in_revision AS CR ON (CR.content=C.id) + INNER JOIN location AS L ON (CR.location=L.id) + INNER JOIN revision AS R ON (CR.revision=R.id) + LEFT JOIN origin as O ON (R.origin=O.id) WHERE C.sha1=%s) UNION (SELECT C.sha1 AS content, R.sha1 AS revision, R.date AS date, + O.url AS url, CASE DL.path WHEN '' THEN CL.path WHEN '.' THEN CL.path ELSE (DL.path || '/' || CL.path)::unix_path END AS path FROM content AS C - INNER JOIN content_in_directory AS CD ON (C.id = CD.content) - INNER JOIN directory_in_revision AS DR ON (CD.directory = DR.directory) - INNER JOIN revision AS R ON (DR.revision = R.id) - INNER JOIN location AS CL ON (CD.location = CL.id) - INNER JOIN location AS DL ON (DR.location = DL.id) + INNER JOIN content_in_directory AS CD ON (C.id=CD.content) + INNER JOIN directory_in_revision AS DR ON (CD.directory=DR.directory) + INNER JOIN revision AS R ON (DR.revision=R.id) + INNER JOIN location AS CL ON (CD.location=CL.id) + INNER JOIN location AS DL ON (DR.location=DL.id) + LEFT JOIN origin AS O ON (R.origin=O.id) WHERE C.sha1=%s) - ORDER BY date, rev, path {early_cut} + ORDER BY date, rev, url, path {early_cut} """, (id, id), ) - yield from self.cursor.fetchall() + for row in self.cursor.fetchall(): + yield ProvenanceResult( + content=row[0], revision=row[1], date=row[2], origin=row[3], path=row[4] + ) def insert_relation(self, relation: str, data: Set[Tuple[Sha1Git, Sha1Git, bytes]]): """Insert entries in `relation` from `data` Also insert missing location entries in the 'location' table. """ if data: assert relation in ( "content_in_revision", "content_in_directory", "directory_in_revision", ) src, dst = relation.split("_in_") # insert missing locations locations = tuple(set((loc,) for (_, _, loc) in data)) psycopg2.extras.execute_values( self.cursor, """ LOCK TABLE ONLY location; INSERT INTO location(path) VALUES %s ON CONFLICT (path) DO NOTHING """, locations, ) psycopg2.extras.execute_values( self.cursor, f""" LOCK TABLE ONLY {relation}; INSERT INTO {relation} SELECT {src}.id, {dst}.id, location.id FROM (VALUES %s) AS V(src, dst, path) INNER JOIN {src} on ({src}.sha1=V.src) INNER JOIN {dst} on ({dst}.sha1=V.dst) INNER JOIN location on (location.path=V.path) """, data, ) data.clear() diff --git a/swh/provenance/postgresql/provenancedb_without_path.py b/swh/provenance/postgresql/provenancedb_without_path.py index a55065a..4598577 100644 --- a/swh/provenance/postgresql/provenancedb_without_path.py +++ b/swh/provenance/postgresql/provenancedb_without_path.py @@ -1,83 +1,96 @@ -from datetime import datetime from typing import Generator, Optional, Set, Tuple import psycopg2 import psycopg2.extras from swh.model.model import Sha1Git +from ..provenance import ProvenanceResult from .provenancedb_base import ProvenanceDBBase class ProvenanceWithoutPathDB(ProvenanceDBBase): - def content_find_first( - self, id: Sha1Git - ) -> Optional[Tuple[Sha1Git, Sha1Git, datetime, bytes]]: + def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]: self.cursor.execute( """ SELECT C.sha1 AS blob, R.sha1 AS rev, R.date AS date, + O.url AS url, '\\x'::bytea as path FROM content AS C - INNER JOIN content_in_revision AS CR ON (CR.content = C.id) - INNER JOIN revision as R ON (CR.revision = R.id) + INNER JOIN content_in_revision AS CR ON (CR.content=C.id) + INNER JOIN revision as R ON (CR.revision=R.id) + LEFT JOIN origin as O ON (R.origin=O.id) WHERE C.sha1=%s - ORDER BY date, rev ASC LIMIT 1 + ORDER BY date, rev, url ASC LIMIT 1 """, (id,), ) - return self.cursor.fetchone() + row = self.cursor.fetchone() + if row: + return ProvenanceResult( + content=row[0], revision=row[1], date=row[2], origin=row[3], path=row[4] + ) + else: + return None def content_find_all( self, id: Sha1Git, limit: Optional[int] = None - ) -> Generator[Tuple[Sha1Git, Sha1Git, datetime, bytes], None, None]: + ) -> Generator[ProvenanceResult, None, None]: early_cut = f"LIMIT {limit}" if limit is not None else "" self.cursor.execute( f""" (SELECT C.sha1 AS blob, R.sha1 AS rev, R.date AS date, + O.url AS url, '\\x'::bytea as path FROM content AS C - INNER JOIN content_in_revision AS CR ON (CR.content = C.id) - INNER JOIN revision AS R ON (CR.revision = R.id) + INNER JOIN content_in_revision AS CR ON (CR.content=C.id) + INNER JOIN revision AS R ON (CR.revision=R.id) + LEFT JOIN origin as O ON (R.origin=O.id) WHERE C.sha1=%s) UNION (SELECT C.sha1 AS content, R.sha1 AS revision, R.date AS date, + O.url AS url, '\\x'::bytea as path FROM content AS C - INNER JOIN content_in_directory AS CD ON (C.id = CD.content) - INNER JOIN directory_in_revision AS DR ON (CD.directory = DR.directory) - INNER JOIN revision AS R ON (DR.revision = R.id) + INNER JOIN content_in_directory AS CD ON (C.id=CD.content) + INNER JOIN directory_in_revision AS DR ON (CD.directory=DR.directory) + INNER JOIN revision AS R ON (DR.revision=R.id) + LEFT JOIN origin as O ON (R.origin=O.id) WHERE C.sha1=%s) - ORDER BY date, rev, path {early_cut} + ORDER BY date, rev, url {early_cut} """, (id, id), ) - yield from self.cursor.fetchall() + for row in self.cursor.fetchall(): + yield ProvenanceResult( + content=row[0], revision=row[1], date=row[2], origin=row[3], path=row[4] + ) def insert_relation(self, relation: str, data: Set[Tuple[Sha1Git, Sha1Git, bytes]]): if data: assert relation in ( "content_in_revision", "content_in_directory", "directory_in_revision", ) src, dst = relation.split("_in_") psycopg2.extras.execute_values( self.cursor, f""" LOCK TABLE ONLY {relation}; INSERT INTO {relation} SELECT {src}.id, {dst}.id FROM (VALUES %s) AS V(src, dst) INNER JOIN {src} on ({src}.sha1=V.src) INNER JOIN {dst} on ({dst}.sha1=V.dst) """, data, ) data.clear() diff --git a/swh/provenance/provenance.py b/swh/provenance/provenance.py index e4bd5ac..1731ec4 100644 --- a/swh/provenance/provenance.py +++ b/swh/provenance/provenance.py @@ -1,309 +1,365 @@ from datetime import datetime import logging import os from typing import Dict, Generator, Iterable, List, Optional, Set, Tuple import psycopg2 from typing_extensions import Literal, Protocol, TypedDict, runtime_checkable from swh.model.model import Sha1Git from .model import DirectoryEntry, FileEntry, OriginEntry, RevisionEntry -# XXX: this protocol doesn't make much sense now that flavours have been delegated to -# another class, lower in the callstack. +class ProvenanceResult: + def __init__( + self, + content: Sha1Git, + revision: Sha1Git, + date: datetime, + origin: Optional[str], + path: bytes, + ) -> None: + self.content = content + self.revision = revision + self.date = date + self.origin = origin + self.path = path + + @runtime_checkable class ProvenanceInterface(Protocol): raise_on_commit: bool = False - def commit(self): - """Commit currently ongoing transactions in the backend DB""" + def flush(self) -> None: + """Flush internal cache to the underlying `storage`.""" ... def content_add_to_directory( self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes ) -> None: + """Associate `blob` with `directory` in the provenance model. `prefix` is the + relative path from `directory` to `blob` (excluding `blob`'s name). + """ ... def content_add_to_revision( self, revision: RevisionEntry, blob: FileEntry, prefix: bytes ) -> None: + """Associate `blob` with `revision` in the provenance model. `prefix` is the + absolute path from `revision`'s root directory to `blob` (excluding `blob`'s + name). + """ ... - def content_find_first( - self, id: Sha1Git - ) -> Optional[Tuple[Sha1Git, Sha1Git, datetime, bytes]]: + def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]: + """Retrieve the first occurrence of the blob identified by `id`.""" ... def content_find_all( self, id: Sha1Git, limit: Optional[int] = None - ) -> Generator[Tuple[Sha1Git, Sha1Git, datetime, bytes], None, None]: + ) -> Generator[ProvenanceResult, None, None]: + """Retrieve all the occurrences of the blob identified by `id`.""" ... def content_get_early_date(self, blob: FileEntry) -> Optional[datetime]: + """Retrieve the earliest known date of `blob`.""" ... def content_get_early_dates( self, blobs: Iterable[FileEntry] ) -> Dict[Sha1Git, datetime]: + """Retrieve the earliest known date for each blob in `blobs`. If some blob has + no associated date, it is not present in the resulting dictionary. + """ ... def content_set_early_date(self, blob: FileEntry, date: datetime) -> None: + """Associate `date` to `blob` as it's earliest known date.""" ... def directory_add_to_revision( self, revision: RevisionEntry, directory: DirectoryEntry, path: bytes ) -> None: + """Associate `directory` with `revision` in the provenance model. `path` is the + absolute path from `revision`'s root directory to `directory` (including + `directory`'s name). + """ ... def directory_get_date_in_isochrone_frontier( self, directory: DirectoryEntry ) -> Optional[datetime]: + """Retrieve the earliest known date of `directory` as an isochrone frontier in + the provenance model. + """ ... def directory_get_dates_in_isochrone_frontier( self, dirs: Iterable[DirectoryEntry] ) -> Dict[Sha1Git, datetime]: + """Retrieve the earliest known date for each directory in `dirs` as isochrone + frontiers provenance model. If some directory has no associated date, it is not + present in the resulting dictionary. + """ ... def directory_set_date_in_isochrone_frontier( self, directory: DirectoryEntry, date: datetime ) -> None: + """Associate `date` to `directory` as it's earliest known date as an isochrone + frontier in the provenance model. + """ ... def origin_add(self, origin: OriginEntry) -> None: + """Add `origin` to the provenance model.""" ... def revision_add(self, revision: RevisionEntry) -> None: + """Add `revision` to the provenance model. This implies storing `revision`'s + date in the model, thus `revision.date` must be a valid date. + """ ... def revision_add_before_revision( - self, relative: RevisionEntry, revision: RevisionEntry + self, head: RevisionEntry, revision: RevisionEntry ) -> None: + """Associate `revision` to `head` as an ancestor of the latter.""" ... def revision_add_to_origin( self, origin: OriginEntry, revision: RevisionEntry ) -> None: + """Associate `revision` to `origin` as a head revision of the latter (ie. the + target of an snapshot for `origin` in the archive).""" ... - def revision_get_early_date(self, revision: RevisionEntry) -> Optional[datetime]: + def revision_get_date(self, revision: RevisionEntry) -> Optional[datetime]: + """Retrieve the date associated to `revision`.""" ... def revision_get_preferred_origin( self, revision: RevisionEntry ) -> Optional[Sha1Git]: + """Retrieve the preferred origin associated to `revision`.""" ... def revision_in_history(self, revision: RevisionEntry) -> bool: + """Check if `revision` is known to be an ancestor of some head revision in the + provenance model. + """ ... def revision_set_preferred_origin( self, origin: OriginEntry, revision: RevisionEntry ) -> None: + """Associate `origin` as the preferred origin for `revision`.""" ... def revision_visited(self, revision: RevisionEntry) -> bool: + """Check if `revision` is known to be a head revision for some origin in the + provenance model. + """ ... class DatetimeCache(TypedDict): data: Dict[Sha1Git, Optional[datetime]] added: Set[Sha1Git] class OriginCache(TypedDict): data: Dict[Sha1Git, str] added: Set[Sha1Git] class RevisionCache(TypedDict): data: Dict[Sha1Git, Sha1Git] added: Set[Sha1Git] class ProvenanceCache(TypedDict): content: DatetimeCache directory: DatetimeCache revision: DatetimeCache # below are insertion caches only content_in_revision: Set[Tuple[Sha1Git, Sha1Git, bytes]] content_in_directory: Set[Tuple[Sha1Git, Sha1Git, bytes]] directory_in_revision: Set[Tuple[Sha1Git, Sha1Git, bytes]] # these two are for the origin layer origin: OriginCache revision_origin: RevisionCache revision_before_revision: Dict[Sha1Git, Set[Sha1Git]] revision_in_origin: Set[Tuple[Sha1Git, Sha1Git]] -def new_cache(): +def new_cache() -> ProvenanceCache: return ProvenanceCache( content=DatetimeCache(data={}, added=set()), directory=DatetimeCache(data={}, added=set()), revision=DatetimeCache(data={}, added=set()), content_in_revision=set(), content_in_directory=set(), directory_in_revision=set(), origin=OriginCache(data={}, added=set()), revision_origin=RevisionCache(data={}, added=set()), revision_before_revision={}, revision_in_origin=set(), ) # TODO: maybe move this to a separate file class ProvenanceBackend: raise_on_commit: bool = False def __init__(self, conn: psycopg2.extensions.connection): from .postgresql.provenancedb_base import ProvenanceDBBase # TODO: this class should not know what the actual used DB is. self.storage: ProvenanceDBBase flavor = ProvenanceDBBase(conn).flavor if flavor == "with-path": from .postgresql.provenancedb_with_path import ProvenanceWithPathDB self.storage = ProvenanceWithPathDB(conn) else: from .postgresql.provenancedb_without_path import ProvenanceWithoutPathDB self.storage = ProvenanceWithoutPathDB(conn) self.cache: ProvenanceCache = new_cache() - def clear_caches(self): + def clear_caches(self) -> None: self.cache = new_cache() - def commit(self): + def flush(self) -> None: # TODO: for now we just forward the cache. This should be improved! while not self.storage.commit(self.cache, raise_on_commit=self.raise_on_commit): logging.warning( - f"Unable to commit cached information {self.write_cache}. Retrying..." + f"Unable to commit cached information {self.cache}. Retrying..." ) self.clear_caches() def content_add_to_directory( self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes - ): + ) -> None: self.cache["content_in_directory"].add( (blob.id, directory.id, normalize(os.path.join(prefix, blob.name))) ) def content_add_to_revision( self, revision: RevisionEntry, blob: FileEntry, prefix: bytes - ): + ) -> None: self.cache["content_in_revision"].add( (blob.id, revision.id, normalize(os.path.join(prefix, blob.name))) ) - def content_find_first( - self, id: Sha1Git - ) -> Optional[Tuple[Sha1Git, Sha1Git, datetime, bytes]]: + def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]: return self.storage.content_find_first(id) def content_find_all( self, id: Sha1Git, limit: Optional[int] = None - ) -> Generator[Tuple[Sha1Git, Sha1Git, datetime, bytes], None, None]: + ) -> Generator[ProvenanceResult, None, None]: yield from self.storage.content_find_all(id, limit=limit) def content_get_early_date(self, blob: FileEntry) -> Optional[datetime]: return self.get_dates("content", [blob.id]).get(blob.id) def content_get_early_dates( self, blobs: Iterable[FileEntry] ) -> Dict[Sha1Git, datetime]: return self.get_dates("content", [blob.id for blob in blobs]) - def content_set_early_date(self, blob: FileEntry, date: datetime): + def content_set_early_date(self, blob: FileEntry, date: datetime) -> None: self.cache["content"]["data"][blob.id] = date self.cache["content"]["added"].add(blob.id) def directory_add_to_revision( self, revision: RevisionEntry, directory: DirectoryEntry, path: bytes - ): + ) -> None: self.cache["directory_in_revision"].add( (directory.id, revision.id, normalize(path)) ) def directory_get_date_in_isochrone_frontier( self, directory: DirectoryEntry ) -> Optional[datetime]: return self.get_dates("directory", [directory.id]).get(directory.id) def directory_get_dates_in_isochrone_frontier( self, dirs: Iterable[DirectoryEntry] ) -> Dict[Sha1Git, datetime]: return self.get_dates("directory", [directory.id for directory in dirs]) def directory_set_date_in_isochrone_frontier( self, directory: DirectoryEntry, date: datetime - ): + ) -> None: self.cache["directory"]["data"][directory.id] = date self.cache["directory"]["added"].add(directory.id) def get_dates( self, entity: Literal["content", "revision", "directory"], ids: List[Sha1Git] ) -> Dict[Sha1Git, datetime]: cache = self.cache[entity] missing_ids = set(id for id in ids if id not in cache) if missing_ids: cache["data"].update(self.storage.get_dates(entity, list(missing_ids))) dates: Dict[Sha1Git, datetime] = {} for sha1 in ids: date = cache["data"].get(sha1) if date is not None: dates[sha1] = date return dates def origin_add(self, origin: OriginEntry) -> None: self.cache["origin"]["data"][origin.id] = origin.url self.cache["origin"]["added"].add(origin.id) - def revision_add(self, revision: RevisionEntry): + def revision_add(self, revision: RevisionEntry) -> None: self.cache["revision"]["data"][revision.id] = revision.date self.cache["revision"]["added"].add(revision.id) def revision_add_before_revision( - self, relative: RevisionEntry, revision: RevisionEntry - ): + self, head: RevisionEntry, revision: RevisionEntry + ) -> None: self.cache["revision_before_revision"].setdefault(revision.id, set()).add( - relative.id + head.id ) - def revision_add_to_origin(self, origin: OriginEntry, revision: RevisionEntry): + def revision_add_to_origin( + self, origin: OriginEntry, revision: RevisionEntry + ) -> None: self.cache["revision_in_origin"].add((revision.id, origin.id)) - def revision_get_early_date(self, revision: RevisionEntry) -> Optional[datetime]: + def revision_get_date(self, revision: RevisionEntry) -> Optional[datetime]: return self.get_dates("revision", [revision.id]).get(revision.id) def revision_get_preferred_origin( self, revision: RevisionEntry ) -> Optional[Sha1Git]: cache = self.cache["revision_origin"] if revision.id not in cache: origin = self.storage.revision_get_preferred_origin(revision.id) if origin is not None: cache["data"][revision.id] = origin return cache["data"].get(revision.id) def revision_in_history(self, revision: RevisionEntry) -> bool: return revision.id in self.cache[ "revision_before_revision" ] or self.storage.revision_in_history(revision.id) def revision_set_preferred_origin( self, origin: OriginEntry, revision: RevisionEntry - ): + ) -> None: self.cache["revision_origin"]["data"][revision.id] = origin.id self.cache["revision_origin"]["added"].add(revision.id) def revision_visited(self, revision: RevisionEntry) -> bool: return revision.id in dict( self.cache["revision_in_origin"] ) or self.storage.revision_visited(revision.id) def normalize(path: bytes) -> bytes: return path[2:] if path.startswith(bytes("." + os.path.sep, "utf-8")) else path diff --git a/swh/provenance/revision.py b/swh/provenance/revision.py index 2f3e047..d857b21 100644 --- a/swh/provenance/revision.py +++ b/swh/provenance/revision.py @@ -1,256 +1,252 @@ from datetime import datetime, timezone from itertools import islice import logging import os import time from typing import Iterable, Iterator, List, Optional, Tuple import iso8601 from swh.model.hashutil import hash_to_bytes from swh.model.model import Sha1Git from .archive import ArchiveInterface from .graph import IsochroneNode, build_isochrone_graph from .model import DirectoryEntry, RevisionEntry from .provenance import ProvenanceInterface class CSVRevisionIterator: """Iterator over revisions typically present in the given CSV file. The input is an iterator that produces 3 elements per row: (id, date, root) where: - id: is the id (sha1_git) of the revision - date: is the author date - root: sha1 of the directory """ def __init__( self, revisions: Iterable[Tuple[Sha1Git, datetime, Sha1Git]], limit: Optional[int] = None, ): self.revisions: Iterator[Tuple[Sha1Git, datetime, Sha1Git]] if limit is not None: self.revisions = islice(revisions, limit) else: self.revisions = iter(revisions) def __iter__(self): return self def __next__(self): id, date, root = next(self.revisions) date = iso8601.parse_date(date) if date.tzinfo is None: date = date.replace(tzinfo=timezone.utc) return RevisionEntry( hash_to_bytes(id), date=date, root=hash_to_bytes(root), ) def revision_add( provenance: ProvenanceInterface, archive: ArchiveInterface, revisions: List[RevisionEntry], trackall: bool = True, lower: bool = True, mindepth: int = 1, commit: bool = True, ) -> None: start = time.time() for revision in revisions: assert revision.date is not None assert revision.root is not None # Processed content starting from the revision's root directory. - date = provenance.revision_get_early_date(revision) + date = provenance.revision_get_date(revision) if date is None or revision.date < date: logging.debug( f"Processing revisions {revision.id.hex()}" f" (known date {date} / revision date {revision.date})..." ) graph = build_isochrone_graph( archive, provenance, revision, DirectoryEntry(revision.root), ) # TODO: add file size filtering revision_process_content( archive, provenance, revision, graph, trackall=trackall, lower=lower, mindepth=mindepth, ) done = time.time() if commit: - provenance.commit() + provenance.flush() stop = time.time() logging.debug( f"Revisions {';'.join([revision.id.hex() for revision in revisions])} " f" were processed in {stop - start} secs (commit took {stop - done} secs)!" ) - # logging.critical( - # ";".join([revision.id.hex() for revision in revisions]) - # + f",{stop - start},{stop - done}" - # ) def revision_process_content( archive: ArchiveInterface, provenance: ProvenanceInterface, revision: RevisionEntry, graph: IsochroneNode, trackall: bool = True, lower: bool = True, mindepth: int = 1, ): assert revision.date is not None provenance.revision_add(revision) stack = [graph] while stack: current = stack.pop() if current.dbdate is not None: assert current.dbdate <= revision.date if trackall: # Current directory is an outer isochrone frontier for a previously # processed revision. It should be reused as is. provenance.directory_add_to_revision( revision, current.entry, current.path ) else: assert current.maxdate is not None # Current directory is not an outer isochrone frontier for any previous # revision. It might be eligible for this one. if is_new_frontier( current, revision=revision, trackall=trackall, lower=lower, mindepth=mindepth, ): # Outer frontier should be moved to current position in the isochrone # graph. This is the first time this directory is found in the isochrone # frontier. provenance.directory_set_date_in_isochrone_frontier( current.entry, current.maxdate ) if trackall: provenance.directory_add_to_revision( revision, current.entry, current.path ) flatten_directory(archive, provenance, current.entry) else: # If current node is an invalidated frontier, update its date for future # revisions to get the proper value. if current.invalid: provenance.directory_set_date_in_isochrone_frontier( current.entry, current.maxdate ) # No point moving the frontier here. Either there are no files or they # are being seen for the first time here. Add all blobs to current # revision updating date if necessary, and recursively analyse # subdirectories as candidates to the outer frontier. for blob in current.entry.files: date = provenance.content_get_early_date(blob) if date is None or revision.date < date: provenance.content_set_early_date(blob, revision.date) provenance.content_add_to_revision(revision, blob, current.path) for child in current.children: stack.append(child) def flatten_directory( archive: ArchiveInterface, provenance: ProvenanceInterface, directory: DirectoryEntry, ) -> None: """Recursively retrieve all the files of 'directory' and insert them in the 'provenance' database in the 'content_to_directory' table. """ stack = [(directory, b"")] while stack: current, prefix = stack.pop() current.retrieve_children(archive) for f_child in current.files: # Add content to the directory with the computed prefix. provenance.content_add_to_directory(directory, f_child, prefix) for d_child in current.dirs: # Recursively walk the child directory. stack.append((d_child, os.path.join(prefix, d_child.name))) def is_new_frontier( node: IsochroneNode, revision: RevisionEntry, trackall: bool = True, lower: bool = True, mindepth: int = 1, ) -> bool: assert node.maxdate is not None # for mypy assert revision.date is not None # idem if trackall: # The only real condition for a directory to be a frontier is that its # content is already known and its maxdate is less (or equal) than # current revision's date. Checking mindepth is meant to skip root # directories (or any arbitrary depth) to improve the result. The # option lower tries to maximize the reusage rate of previously defined # frontiers by keeping them low in the directory tree. return ( node.known and node.maxdate <= revision.date # all content is earlier than revision and node.depth >= mindepth # current node is deeper than the min allowed depth and (has_blobs(node) if lower else True) # there is at least one blob in it ) else: # If we are only tracking first occurrences, we want to ensure that all first # occurrences end up in the content_early_in_rev relation. Thus, we force for # every blob outside a frontier to have an extrictly earlier date. return ( node.maxdate < revision.date # all content is earlier than revision and node.depth >= mindepth # deeper than the min allowed depth and (has_blobs(node) if lower else True) # there is at least one blob ) def has_blobs(node: IsochroneNode) -> bool: # We may want to look for files in different ways to decide whether to define a # frontier or not: # 1. Only files in current node: return any(node.entry.files) # 2. Files anywhere in the isochrone graph # stack = [node] # while stack: # current = stack.pop() # if any( # map(lambda child: isinstance(child.entry, FileEntry), current.children)): # return True # else: # # All children are directory entries. # stack.extend(current.children) # return False # 3. Files in the intermediate directories between current node and any previously # defined frontier: # TODO: complete this case! # return any( # map(lambda child: isinstance(child.entry, FileEntry), node.children) # ) or all( # map( # lambda child: ( # not (isinstance(child.entry, DirectoryEntry) and child.date is None) # ) # or has_blobs(child), # node.children, # ) # ) diff --git a/swh/provenance/tests/test_conftest.py b/swh/provenance/tests/test_conftest.py index f0725ec..8b9d23a 100644 --- a/swh/provenance/tests/test_conftest.py +++ b/swh/provenance/tests/test_conftest.py @@ -1,19 +1,19 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information def test_provenance_fixture(provenance): """Check the 'provenance' fixture produce a working ProvenanceDB object""" assert provenance - provenance.commit() # should be a noop + provenance.flush() # should be a noop def test_storage(swh_storage_with_objects): """Check the 'swh_storage_with_objects' fixture produce a working Storage object with at least some Content, Revision and Directory in it""" assert swh_storage_with_objects assert swh_storage_with_objects.content_get_random() assert swh_storage_with_objects.directory_get_random() assert swh_storage_with_objects.revision_get_random() diff --git a/swh/provenance/tests/test_history_graph.py b/swh/provenance/tests/test_history_graph.py index 3dc0037..091201a 100644 --- a/swh/provenance/tests/test_history_graph.py +++ b/swh/provenance/tests/test_history_graph.py @@ -1,62 +1,62 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import pytest import yaml from swh.model.hashutil import hash_to_bytes from swh.provenance.graph import HistoryNode, build_history_graph from swh.provenance.model import OriginEntry, RevisionEntry from swh.provenance.origin import origin_add_revision from swh.provenance.tests.conftest import fill_storage, get_datafile, load_repo_data def history_graph_from_dict(d) -> HistoryNode: """Takes a dictionary representing a tree of HistoryNode objects, and recursively builds the corresponding graph.""" node = HistoryNode( entry=RevisionEntry(hash_to_bytes(d["rev"])), visited=d.get("visited", False), in_history=d.get("in_history", False), ) node.parents = set( history_graph_from_dict(parent) for parent in d.get("parents", []) ) return node @pytest.mark.parametrize( "repo, visit", (("with-merges", "visits-01"),), ) @pytest.mark.parametrize("batch", (True, False)) def test_history_graph(provenance, swh_storage, archive, repo, visit, batch): # read data/README.md for more details on how these datasets are generated data = load_repo_data(repo) fill_storage(swh_storage, data) filename = f"history_graphs_{repo}_{visit}.yaml" with open(get_datafile(filename)) as file: for expected in yaml.full_load(file): entry = OriginEntry(expected["origin"], hash_to_bytes(expected["snapshot"])) provenance.origin_add(entry) for graph_as_dict in expected["graphs"]: expected_graph = history_graph_from_dict(graph_as_dict) print("Expected graph:", expected_graph) computed_graph = build_history_graph( archive, provenance, RevisionEntry(hash_to_bytes(graph_as_dict["rev"])), ) print("Computed graph:", computed_graph) assert computed_graph == expected_graph origin_add_revision(provenance, entry, computed_graph) if not batch: - provenance.commit() + provenance.flush() diff --git a/swh/provenance/tests/test_provenance_heuristics.py b/swh/provenance/tests/test_provenance_heuristics.py index 90af26b..19dde8c 100644 --- a/swh/provenance/tests/test_provenance_heuristics.py +++ b/swh/provenance/tests/test_provenance_heuristics.py @@ -1,347 +1,350 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Dict, List, Tuple import pytest from swh.model.hashutil import hash_to_bytes from swh.provenance.model import RevisionEntry from swh.provenance.revision import revision_add from swh.provenance.tests.conftest import ( fill_storage, get_datafile, load_repo_data, synthetic_result, ) from swh.provenance.tests.test_provenance_db import ts2dt def sha1s(cur, table): """return the 'sha1' column from the DB 'table' (as hex) 'cur' is a cursor to the provenance index DB. """ cur.execute(f"SELECT sha1 FROM {table}") return set(sha1.hex() for (sha1,) in cur.fetchall()) def locations(cur): """return the 'path' column from the DB location table 'cur' is a cursor to the provenance index DB. """ cur.execute("SELECT encode(location.path::bytea, 'escape') FROM location") return set(x for (x,) in cur.fetchall()) def relations(cur, src, dst): """return the triplets ('sha1', 'sha1', 'path') from the DB for the relation between 'src' table and 'dst' table (i.e. for C-R, C-D and D-R relations). 'cur' is a cursor to the provenance index DB. """ relation = f"{src}_in_{dst}" cur.execute("select swh_get_dbflavor()") with_path = cur.fetchone()[0] == "with-path" # note that the columns have the same name as the relations they refer to, # so we can write things like "rel.{dst}=src.id" in the query below if with_path: cur.execute( f""" SELECT encode(src.sha1::bytea, 'hex'), encode(dst.sha1::bytea, 'hex'), encode(location.path::bytea, 'escape') FROM {relation} as relation INNER JOIN {src} AS src ON (relation.{src} = src.id) INNER JOIN {dst} AS dst ON (relation.{dst} = dst.id) INNER JOIN location ON (relation.location = location.id) """ ) else: cur.execute( f""" SELECT encode(src.sha1::bytea, 'hex'), encode(dst.sha1::bytea, 'hex'), '' FROM {relation} as relation INNER JOIN {src} AS src ON (src.id = relation.{src}) INNER JOIN {dst} AS dst ON (dst.id = relation.{dst}) """ ) return set(cur.fetchall()) def get_timestamp(cur, table, sha1): """return the date for the 'sha1' from the DB 'table' (as hex) 'cur' is a cursor to the provenance index DB. """ if isinstance(sha1, str): sha1 = hash_to_bytes(sha1) cur.execute(f"SELECT date FROM {table} WHERE sha1=%s", (sha1,)) return [date.timestamp() for (date,) in cur.fetchall()] @pytest.mark.parametrize( "repo, lower, mindepth", ( ("cmdbts2", True, 1), ("cmdbts2", False, 1), ("cmdbts2", True, 2), ("cmdbts2", False, 2), ("out-of-order", True, 1), ), ) def test_provenance_heuristics(provenance, swh_storage, archive, repo, lower, mindepth): # read data/README.md for more details on how these datasets are generated data = load_repo_data(repo) fill_storage(swh_storage, data) syntheticfile = get_datafile( f"synthetic_{repo}_{'lower' if lower else 'upper'}_{mindepth}.txt" ) revisions = {rev["id"]: rev for rev in data["revision"]} rows = { "content": set(), "content_in_directory": set(), "content_in_revision": set(), "directory": set(), "directory_in_revision": set(), "location": set(), "revision": set(), } cursor = provenance.storage.cursor def maybe_path(path: str) -> str: if provenance.storage.with_path: return path return "" for synth_rev in synthetic_result(syntheticfile): revision = revisions[synth_rev["sha1"]] entry = RevisionEntry( id=revision["id"], date=ts2dt(revision["date"]), root=revision["directory"], ) revision_add(provenance, archive, [entry], lower=lower, mindepth=mindepth) # each "entry" in the synth file is one new revision rows["revision"].add(synth_rev["sha1"].hex()) assert rows["revision"] == sha1s(cursor, "revision"), synth_rev["msg"] # check the timestamp of the revision rev_ts = synth_rev["date"] assert get_timestamp(cursor, "revision", synth_rev["sha1"].hex()) == [ rev_ts ], synth_rev["msg"] # this revision might have added new content objects rows["content"] |= set(x["dst"].hex() for x in synth_rev["R_C"]) rows["content"] |= set(x["dst"].hex() for x in synth_rev["D_C"]) assert rows["content"] == sha1s(cursor, "content"), synth_rev["msg"] # check for R-C (direct) entries # these are added directly in the content_early_in_rev table rows["content_in_revision"] |= set( (x["dst"].hex(), x["src"].hex(), maybe_path(x["path"])) for x in synth_rev["R_C"] ) assert rows["content_in_revision"] == relations( cursor, "content", "revision" ), synth_rev["msg"] # check timestamps for rc in synth_rev["R_C"]: assert get_timestamp(cursor, "content", rc["dst"]) == [ rev_ts + rc["rel_ts"] ], synth_rev["msg"] # check directories # each directory stored in the provenance index is an entry # in the "directory" table... rows["directory"] |= set(x["dst"].hex() for x in synth_rev["R_D"]) assert rows["directory"] == sha1s(cursor, "directory"), synth_rev["msg"] # ... + a number of rows in the "directory_in_rev" table... # check for R-D entries rows["directory_in_revision"] |= set( (x["dst"].hex(), x["src"].hex(), maybe_path(x["path"])) for x in synth_rev["R_D"] ) assert rows["directory_in_revision"] == relations( cursor, "directory", "revision" ), synth_rev["msg"] # check timestamps for rd in synth_rev["R_D"]: assert get_timestamp(cursor, "directory", rd["dst"]) == [ rev_ts + rd["rel_ts"] ], synth_rev["msg"] # ... + a number of rows in the "content_in_dir" table # for content of the directory. # check for D-C entries rows["content_in_directory"] |= set( (x["dst"].hex(), x["src"].hex(), maybe_path(x["path"])) for x in synth_rev["D_C"] ) assert rows["content_in_directory"] == relations( cursor, "content", "directory" ), synth_rev["msg"] # check timestamps for dc in synth_rev["D_C"]: assert get_timestamp(cursor, "content", dc["dst"]) == [ rev_ts + dc["rel_ts"] ], synth_rev["msg"] if provenance.storage.with_path: # check for location entries rows["location"] |= set(x["path"] for x in synth_rev["R_C"]) rows["location"] |= set(x["path"] for x in synth_rev["D_C"]) rows["location"] |= set(x["path"] for x in synth_rev["R_D"]) assert rows["location"] == locations(cursor), synth_rev["msg"] @pytest.mark.parametrize( "repo, lower, mindepth", ( ("cmdbts2", True, 1), ("cmdbts2", False, 1), ("cmdbts2", True, 2), ("cmdbts2", False, 2), ("out-of-order", True, 1), ), ) def test_provenance_heuristics_content_find_all( provenance, swh_storage, archive, repo, lower, mindepth ): # read data/README.md for more details on how these datasets are generated data = load_repo_data(repo) fill_storage(swh_storage, data) revisions = [ RevisionEntry( id=revision["id"], date=ts2dt(revision["date"]), root=revision["directory"], ) for revision in data["revision"] ] def maybe_path(path: str) -> str: if provenance.storage.with_path: return path return "" # XXX adding all revisions at once should be working just fine, but it does not... # revision_add(provenance, archive, revisions, lower=lower, mindepth=mindepth) # ...so add revisions one at a time for now for revision in revisions: revision_add(provenance, archive, [revision], lower=lower, mindepth=mindepth) syntheticfile = get_datafile( f"synthetic_{repo}_{'lower' if lower else 'upper'}_{mindepth}.txt" ) expected_occurrences = {} for synth_rev in synthetic_result(syntheticfile): rev_id = synth_rev["sha1"].hex() rev_ts = synth_rev["date"] for rc in synth_rev["R_C"]: expected_occurrences.setdefault(rc["dst"].hex(), []).append( - (rev_id, rev_ts, maybe_path(rc["path"])) + (rev_id, rev_ts, None, maybe_path(rc["path"])) ) for dc in synth_rev["D_C"]: assert dc["prefix"] is not None # to please mypy expected_occurrences.setdefault(dc["dst"].hex(), []).append( - (rev_id, rev_ts, maybe_path(dc["prefix"] + "/" + dc["path"])) + (rev_id, rev_ts, None, maybe_path(dc["prefix"] + "/" + dc["path"])) ) for content_id, results in expected_occurrences.items(): expected = [(content_id, *result) for result in results] db_occurrences = [ - (blob.hex(), rev.hex(), date.timestamp(), path.decode()) - for blob, rev, date, path in provenance.content_find_all( - hash_to_bytes(content_id) + ( + occur.content.hex(), + occur.revision.hex(), + occur.date.timestamp(), + occur.origin, + occur.path.decode(), ) + for occur in provenance.content_find_all(hash_to_bytes(content_id)) ] if provenance.storage.with_path: # this is not true if the db stores no path, because a same content # that appears several times in a given revision may be reported # only once by content_find_all() assert len(db_occurrences) == len(expected) assert set(db_occurrences) == set(expected) @pytest.mark.parametrize( "repo, lower, mindepth", ( ("cmdbts2", True, 1), ("cmdbts2", False, 1), ("cmdbts2", True, 2), ("cmdbts2", False, 2), ("out-of-order", True, 1), ), ) def test_provenance_heuristics_content_find_first( provenance, swh_storage, archive, repo, lower, mindepth ): # read data/README.md for more details on how these datasets are generated data = load_repo_data(repo) fill_storage(swh_storage, data) revisions = [ RevisionEntry( id=revision["id"], date=ts2dt(revision["date"]), root=revision["directory"], ) for revision in data["revision"] ] # XXX adding all revisions at once should be working just fine, but it does not... # revision_add(provenance, archive, revisions, lower=lower, mindepth=mindepth) # ...so add revisions one at a time for now for revision in revisions: revision_add(provenance, archive, [revision], lower=lower, mindepth=mindepth) syntheticfile = get_datafile( f"synthetic_{repo}_{'lower' if lower else 'upper'}_{mindepth}.txt" ) expected_first: Dict[str, Tuple[str, str, List[str]]] = {} # dict of tuples (blob_id, rev_id, [path, ...]) the third element for path # is a list because a content can be added at several places in a single # revision, in which case the result of content_find_first() is one of # those path, but we have no guarantee which one it will return. for synth_rev in synthetic_result(syntheticfile): rev_id = synth_rev["sha1"].hex() rev_ts = synth_rev["date"] for rc in synth_rev["R_C"]: sha1 = rc["dst"].hex() if sha1 not in expected_first: assert rc["rel_ts"] == 0 expected_first[sha1] = (rev_id, rev_ts, [rc["path"]]) else: if rev_ts == expected_first[sha1][1]: expected_first[sha1][2].append(rc["path"]) elif rev_ts < expected_first[sha1][1]: expected_first[sha1] = (rev_id, rev_ts, rc["path"]) for dc in synth_rev["D_C"]: sha1 = rc["dst"].hex() assert sha1 in expected_first # nothing to do there, this content cannot be a "first seen file" for content_id, (rev_id, ts, paths) in expected_first.items(): - (r_sha1, r_rev_id, r_ts, r_path) = provenance.content_find_first( - hash_to_bytes(content_id) - ) - assert r_sha1.hex() == content_id - assert r_rev_id.hex() == rev_id - assert r_ts.timestamp() == ts + occur = provenance.content_find_first(hash_to_bytes(content_id)) + assert occur.content.hex() == content_id + assert occur.revision.hex() == rev_id + assert occur.date.timestamp() == ts + assert occur.origin is None if provenance.storage.with_path: - assert r_path.decode() in paths + assert occur.path.decode() in paths