diff --git a/swh/provenance/cli.py b/swh/provenance/cli.py index cc38c38..00b5d3e 100644 --- a/swh/provenance/cli.py +++ b/swh/provenance/cli.py @@ -1,203 +1,203 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information # WARNING: do not import unnecessary things here to keep cli startup time under # control import os from typing import Any, Dict, Optional import click import yaml from swh.core import config from swh.core.cli import CONTEXT_SETTINGS from swh.core.cli import swh as swh_cli_group from swh.model.hashutil import hash_to_bytes, hash_to_hex # All generic config code should reside in swh.core.config CONFIG_ENVVAR = "SWH_CONFIG_FILE" DEFAULT_CONFIG_PATH = os.path.join(click.get_app_dir("swh"), "global.yml") DEFAULT_PATH = os.environ.get(CONFIG_ENVVAR, DEFAULT_CONFIG_PATH) DEFAULT_CONFIG: Dict[str, Any] = { "archive": { "cls": "api", "storage": { "cls": "remote", "url": "http://uffizi.internal.softwareheritage.org:5002", } # "cls": "ps", # "db": { # "host": "db.internal.softwareheritage.org", # "dbname": "softwareheritage", # "user": "guest" # } }, "provenance": {"cls": "ps", "db": {"host": "localhost", "dbname": "provenance"}}, } CONFIG_FILE_HELP = f"""Configuration file: \b The CLI option or the environment variable will fail if invalid. CLI option is checked first. Then, environment variable {CONFIG_ENVVAR} is checked. Then, if cannot load the default path, a set of default values are used. Default config path is {DEFAULT_CONFIG_PATH}. Default config values are: \b {yaml.dump(DEFAULT_CONFIG)}""" PROVENANCE_HELP = f"""Software Heritage Scanner tools. {CONFIG_FILE_HELP}""" @swh_cli_group.group( name="provenance", context_settings=CONTEXT_SETTINGS, - help=PROVENANCE_HELP, + help=PROVENANCE_HELP ) @click.option( "-C", "--config-file", default=None, type=click.Path(exists=False, dir_okay=False, path_type=str), - help="""YAML configuration file.""", + help="""YAML configuration file.""" ) @click.option( "-P", "--profile", default=None, type=click.Path(exists=False, dir_okay=False, path_type=str), - help="""Enable profiling to specified file.""", + help="""Enable profiling to specified file.""" ) @click.pass_context def cli(ctx, config_file: Optional[str], profile: str): if config_file is None and config.config_exists(DEFAULT_PATH): config_file = DEFAULT_PATH if config_file is None: conf = DEFAULT_CONFIG else: # read_raw_config do not fail on ENOENT if not config.config_exists(config_file): raise FileNotFoundError(config_file) conf = config.read_raw_config(config.config_basepath(config_file)) conf = config.merge_configs(DEFAULT_CONFIG, conf) ctx.ensure_object(dict) ctx.obj["config"] = conf if profile: import cProfile import atexit print("Profiling...") pr = cProfile.Profile() pr.enable() def exit(): pr.disable() pr.dump_stats(profile) atexit.register(exit) @cli.command(name="create") @click.option("--name", default=None) @click.pass_context def create(ctx, name): """Create new provenance database.""" from .postgresql.db_utils import connect from .postgresql.provenance import create_database # Connect to server without selecting a database conninfo = ctx.obj["config"]["provenance"]["db"] conn = connect(conninfo) create_database(conn, conninfo, name) @cli.command(name="iter-revisions") @click.argument("filename") @click.option("-l", "--limit", type=int) @click.pass_context def iter_revisions(ctx, filename, limit): """Process a provided list of revisions.""" from . import get_archive, get_provenance from .revision import FileRevisionIterator from .provenance import revision_add archive = get_archive(**ctx.obj["config"]["archive"]) provenance = get_provenance(**ctx.obj["config"]["provenance"]) revisions = FileRevisionIterator(filename, archive, limit=limit) while True: revision = revisions.next() if revision is None: break revision_add(provenance, archive, revision) @cli.command(name="iter-origins") @click.argument("filename") @click.option("-l", "--limit", type=int) @click.pass_context def iter_origins(ctx, filename, limit): """Process a provided list of origins.""" from . import get_archive, get_provenance from .origin import FileOriginIterator from .provenance import origin_add archive = get_archive(**ctx.obj["config"]["archive"]) provenance = get_provenance(**ctx.obj["config"]["provenance"]) for origin in FileOriginIterator(filename, archive, limit=limit): origin_add(provenance, origin) @cli.command(name="find-first") @click.argument("swhid") @click.pass_context def find_first(ctx, swhid): """Find first occurrence of the requested blob.""" from . import get_provenance provenance = get_provenance(**ctx.obj["config"]["provenance"]) # TODO: return a dictionary with proper keys for each field row = provenance.content_find_first(hash_to_bytes(swhid)) if row is not None: print( "{blob}, {rev}, {date}, {path}".format( blob=hash_to_hex(row[0]), rev=hash_to_hex(row[1]), date=row[2], - path=os.fsdecode(row[3]), + path=os.fsdecode(row[3]) ) ) else: print(f"Cannot find a content with the id {swhid}") @cli.command(name="find-all") @click.argument("swhid") @click.pass_context def find_all(ctx, swhid): """Find all occurrences of the requested blob.""" from swh.provenance import get_provenance provenance = get_provenance(**ctx.obj["config"]["provenance"]) # TODO: return a dictionary with proper keys for each field for row in provenance.content_find_all(hash_to_bytes(swhid)): print( "{blob}, {rev}, {date}, {path}".format( blob=hash_to_hex(row[0]), rev=hash_to_hex(row[1]), date=row[2], - path=os.fsdecode(row[3]), + path=os.fsdecode(row[3]) ) ) diff --git a/swh/provenance/model.py b/swh/provenance/model.py index 37ff247..6e91e87 100644 --- a/swh/provenance/model.py +++ b/swh/provenance/model.py @@ -1,36 +1,36 @@ from .archive import ArchiveInterface class TreeEntry: def __init__(self, id: bytes, name: bytes): self.id = id self.name = name class DirectoryEntry(TreeEntry): def __init__(self, archive: ArchiveInterface, id: bytes, name: bytes): super().__init__(id, name) self.archive = archive self.children = None def __iter__(self): if self.children is None: self.children = [] for child in self.archive.directory_ls(self.id): if child["type"] == "dir": self.children.append( DirectoryEntry( self.archive, child["target"], - child["name"], + child["name"] ) ) elif child["type"] == "file": self.children.append(FileEntry(child["target"], child["name"])) return iter(self.children) class FileEntry(TreeEntry): pass diff --git a/swh/provenance/origin.py b/swh/provenance/origin.py index d54d79d..7d3d309 100644 --- a/swh/provenance/origin.py +++ b/swh/provenance/origin.py @@ -1,111 +1,111 @@ from .archive import ArchiveInterface from .revision import RevisionEntry from typing import Optional from swh.model.model import Origin, ObjectType, TargetType class OriginEntry: def __init__(self, url, revisions, id=None): self.id = id self.url = url self.revisions = revisions ################################################################################ ################################################################################ class OriginIterator: """Iterator interface.""" def __iter__(self): pass def __next__(self): pass class FileOriginIterator(OriginIterator): """Iterator over origins present in the given CSV file.""" def __init__( self, filename: str, archive: ArchiveInterface, limit: Optional[int] = None ): self.file = open(filename) self.limit = limit # self.mutex = threading.Lock() self.archive = archive def __iter__(self): yield from iterate_statuses( [Origin(url.strip()) for url in self.file], self.archive, self.limit ) class ArchiveOriginIterator: """Iterator over origins present in the given storage.""" def __init__(self, archive: ArchiveInterface, limit: Optional[int] = None): self.limit = limit # self.mutex = threading.Lock() self.archive = archive def __iter__(self): yield from iterate_statuses( self.archive.iter_origins(), self.archive, self.limit ) def iterate_statuses(origins, archive: ArchiveInterface, limit: Optional[int] = None): idx = 0 for origin in origins: for visit in archive.iter_origin_visits(origin.url): for status in archive.iter_origin_visit_statuses(origin.url, visit.visit): # TODO: may filter only those whose status is 'full'?? targets = [] releases = [] snapshot = archive.snapshot_get_all_branches(status.snapshot) if snapshot is not None: for branch in snapshot.branches: if snapshot.branches[branch].target_type == TargetType.REVISION: targets.append(snapshot.branches[branch].target) elif ( snapshot.branches[branch].target_type == TargetType.RELEASE ): releases.append(snapshot.branches[branch].target) # This is done to keep the query in release_get small, hence avoiding # a timeout. limit = 100 for i in range(0, len(releases), limit): for release in archive.release_get(releases[i : i + limit]): if release is not None: if release.target_type == ObjectType.REVISION: targets.append(release.target) # This is done to keep the query in revision_get small, hence avoiding # a timeout. revisions = [] limit = 100 for i in range(0, len(targets), limit): for revision in archive.revision_get(targets[i : i + limit]): if revision is not None: parents = list( map( lambda id: RevisionEntry(archive, id), - revision.parents, + revision.parents ) ) revisions.append( RevisionEntry(archive, revision.id, parents=parents) ) yield OriginEntry(status.origin, revisions) idx = idx + 1 if idx == limit: return diff --git a/swh/provenance/postgresql/archive.py b/swh/provenance/postgresql/archive.py index e830363..c90f0d6 100644 --- a/swh/provenance/postgresql/archive.py +++ b/swh/provenance/postgresql/archive.py @@ -1,78 +1,78 @@ import psycopg2 import threading from ..archive import ArchiveInterface # from functools import lru_cache from methodtools import lru_cache from typing import List class ArchivePostgreSQL(ArchiveInterface): def __init__(self, conn: psycopg2.extensions.connection): self.conn = conn self.cursor = conn.cursor() self.mutex = threading.Lock() @lru_cache(maxsize=1000000) def directory_ls(self, id: bytes): self.mutex.acquire() self.cursor.execute( """WITH dir AS (SELECT id AS dir_id, dir_entries, file_entries, rev_entries FROM directory WHERE id=%s), ls_d AS (SELECT dir_id, unnest(dir_entries) AS entry_id from dir), ls_f AS (SELECT dir_id, unnest(file_entries) AS entry_id from dir), ls_r AS (SELECT dir_id, unnest(rev_entries) AS entry_id from dir) (SELECT 'dir'::directory_entry_type AS type, e.target, e.name, NULL::sha1_git FROM ls_d LEFT JOIN directory_entry_dir e ON ls_d.entry_id=e.id) UNION (WITH known_contents AS (SELECT 'file'::directory_entry_type AS type, e.target, e.name, c.sha1_git FROM ls_f LEFT JOIN directory_entry_file e ON ls_f.entry_id=e.id INNER JOIN content c ON e.target=c.sha1_git) SELECT * FROM known_contents UNION (SELECT 'file'::directory_entry_type AS type, e.target, e.name, c.sha1_git FROM ls_f LEFT JOIN directory_entry_file e ON ls_f.entry_id=e.id LEFT JOIN skipped_content c ON e.target=c.sha1_git WHERE NOT EXISTS ( SELECT 1 FROM known_contents WHERE known_contents.sha1_git=e.target ) ) ) ORDER BY name """, - (id,), + (id,) ) entries = [ {"type": row[0], "target": row[1], "name": row[2]} for row in self.cursor.fetchall() ] self.mutex.release() return entries def iter_origins(self): raise NotImplementedError def iter_origin_visits(self, origin: str): raise NotImplementedError def iter_origin_visit_statuses(self, origin: str, visit: int): raise NotImplementedError def release_get(self, ids: List[bytes]): raise NotImplementedError def revision_get(self, ids: List[bytes]): raise NotImplementedError def snapshot_get_all_branches(self, snapshot: bytes): raise NotImplementedError diff --git a/swh/provenance/postgresql/provenance.py b/swh/provenance/postgresql/provenance.py index 1c63dda..20eb88f 100644 --- a/swh/provenance/postgresql/provenance.py +++ b/swh/provenance/postgresql/provenance.py @@ -1,406 +1,406 @@ import itertools import logging import os import psycopg2 import psycopg2.extras from ..model import DirectoryEntry, FileEntry from ..origin import OriginEntry from .db_utils import connect, execute_sql from ..provenance import ProvenanceInterface from ..revision import RevisionEntry from datetime import datetime from typing import Any, Dict, List, Optional def normalize(path: bytes) -> bytes: return path[2:] if path.startswith(bytes("." + os.path.sep, "utf-8")) else path def create_database(conn: psycopg2.extensions.connection, conninfo: dict, name: str): conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) # Normalize dbname to avoid issues when reconnecting below name = name.casefold() # Create new database dropping previous one if exists cursor = conn.cursor() cursor.execute(f"""DROP DATABASE IF EXISTS {name}""") cursor.execute(f"""CREATE DATABASE {name}""") conn.close() # Reconnect to server selecting newly created database to add tables conninfo["dbname"] = name conn = connect(conninfo) sqldir = os.path.dirname(os.path.realpath(__file__)) execute_sql(conn, os.path.join(sqldir, "provenance.sql")) ######################################################################################## ######################################################################################## ######################################################################################## class ProvenancePostgreSQL(ProvenanceInterface): def __init__(self, conn: psycopg2.extensions.connection): # TODO: consider adding a mutex for thread safety conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) self.conn = conn self.cursor = self.conn.cursor() self.insert_cache: Dict[str, Any] = {} self.remove_cache: Dict[str, Any] = {} self.select_cache: Dict[str, Any] = {} self.clear_caches() def clear_caches(self): self.insert_cache = { "content": dict(), "content_early_in_rev": list(), "content_in_dir": list(), "directory": dict(), "directory_in_rev": list(), "revision": dict(), "revision_before_rev": list(), "revision_in_org": list(), } self.remove_cache = {"directory": dict()} self.select_cache = {"content": dict(), "directory": dict(), "revision": dict()} def commit(self): result = False try: self.insert_all() # self.conn.commit() result = True # except psycopg2.DatabaseError: # # Database error occurred, rollback all changes # self.conn.rollback() # # TODO: maybe serialize and auto-merge transations. # # The only conflicts are on: # # - content: we keep the earliest date # # - directory: we keep the earliest date # # - content_in_dir: there should be just duplicated entries. except Exception as error: # Unexpected error occurred, rollback all changes and log message logging.error(f"Unexpected error: {error}") # self.conn.rollback() finally: self.clear_caches() return result def content_add_to_directory( self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes ): self.insert_cache["content_in_dir"].append( (blob.id, directory.id, normalize(os.path.join(prefix, blob.name))) ) def content_add_to_revision( self, revision: RevisionEntry, blob: FileEntry, prefix: bytes ): self.insert_cache["content_early_in_rev"].append( (blob.id, revision.id, normalize(os.path.join(prefix, blob.name))) ) def content_find_first(self, blobid: bytes): self.cursor.execute( """SELECT blob, rev, date, path FROM content_early_in_rev JOIN revision ON revision.id=content_early_in_rev.rev WHERE content_early_in_rev.blob=%s ORDER BY date, rev, path ASC LIMIT 1""", - (blobid,), + (blobid,) ) return self.cursor.fetchone() def content_find_all(self, blobid: bytes): self.cursor.execute( """(SELECT blob, rev, date, path FROM content_early_in_rev JOIN revision ON revision.id=content_early_in_rev.rev WHERE content_early_in_rev.blob=%s) UNION (SELECT content_in_rev.blob, content_in_rev.rev, revision.date, content_in_rev.path FROM (SELECT content_in_dir.blob, directory_in_rev.rev, CASE directory_in_rev.path WHEN '' THEN content_in_dir.path WHEN '.' THEN content_in_dir.path ELSE (directory_in_rev.path || '/' || content_in_dir.path)::unix_path END AS path FROM content_in_dir JOIN directory_in_rev ON content_in_dir.dir=directory_in_rev.dir WHERE content_in_dir.blob=%s ) AS content_in_rev JOIN revision ON revision.id=content_in_rev.rev ) ORDER BY date, rev, path""", - (blobid, blobid), + (blobid, blobid) ) # POSTGRESQL EXPLAIN yield from self.cursor.fetchall() def content_get_early_date(self, blob: FileEntry) -> Optional[datetime]: # First check if the date is being modified by current transection. date = self.insert_cache["content"].get(blob.id, None) if date is None: # If not, check whether it's been query before date = self.select_cache["content"].get(blob.id, None) if date is None: # Otherwise, query the database and cache the value self.cursor.execute( """SELECT date FROM content WHERE id=%s""", (blob.id,) ) row = self.cursor.fetchone() date = row[0] if row is not None else None self.select_cache["content"][blob.id] = date return date def content_get_early_dates(self, blobs: List[FileEntry]) -> Dict[bytes, datetime]: dates = {} pending = [] for blob in blobs: # First check if the date is being modified by current transection. date = self.insert_cache["content"].get(blob.id, None) if date is not None: dates[blob.id] = date else: # If not, check whether it's been query before date = self.select_cache["content"].get(blob.id, None) if date is not None: dates[blob.id] = date else: pending.append(blob.id) if pending: # Otherwise, query the database and cache the values values = ", ".join(itertools.repeat("%s", len(pending))) self.cursor.execute( f"""SELECT id, date FROM content WHERE id IN ({values})""", - tuple(pending), + tuple(pending) ) for row in self.cursor.fetchall(): dates[row[0]] = row[1] self.select_cache["content"][row[0]] = row[1] return dates def content_set_early_date(self, blob: FileEntry, date: datetime): self.insert_cache["content"][blob.id] = date def directory_add_to_revision( self, revision: RevisionEntry, directory: DirectoryEntry, path: bytes ): self.insert_cache["directory_in_rev"].append( (directory.id, revision.id, normalize(path)) ) def directory_get_date_in_isochrone_frontier( self, directory: DirectoryEntry ) -> Optional[datetime]: # First check if the date is being modified by current transection. date = self.insert_cache["directory"].get(directory.id, None) if date is None and directory.id not in self.remove_cache["directory"]: # If not, check whether it's been query before date = self.select_cache["directory"].get(directory.id, None) if date is None: # Otherwise, query the database and cache the value self.cursor.execute( """SELECT date FROM directory WHERE id=%s""", (directory.id,) ) row = self.cursor.fetchone() date = row[0] if row is not None else None self.select_cache["directory"][directory.id] = date return date def directory_get_dates_in_isochrone_frontier( self, dirs: List[DirectoryEntry] ) -> Dict[bytes, datetime]: dates = {} pending = [] for directory in dirs: # First check if the date is being modified by current transection. date = self.insert_cache["directory"].get(directory.id, None) if date is not None: dates[directory.id] = date elif directory.id not in self.remove_cache["directory"]: # If not, check whether it's been query before date = self.select_cache["directory"].get(directory.id, None) if date is not None: dates[directory.id] = date else: pending.append(directory.id) if pending: # Otherwise, query the database and cache the values values = ", ".join(itertools.repeat("%s", len(pending))) self.cursor.execute( f"""SELECT id, date FROM directory WHERE id IN ({values})""", - tuple(pending), + tuple(pending) ) for row in self.cursor.fetchall(): dates[row[0]] = row[1] self.select_cache["directory"][row[0]] = row[1] return dates def directory_invalidate_in_isochrone_frontier(self, directory: DirectoryEntry): self.remove_cache["directory"][directory.id] = None self.insert_cache["directory"].pop(directory.id, None) def directory_set_date_in_isochrone_frontier( self, directory: DirectoryEntry, date: datetime ): self.insert_cache["directory"][directory.id] = date self.remove_cache["directory"].pop(directory.id, None) def insert_all(self): # Performe insertions with cached information if self.insert_cache["content"]: psycopg2.extras.execute_values( self.cursor, """INSERT INTO content(id, date) VALUES %s ON CONFLICT (id) DO UPDATE SET date=LEAST(EXCLUDED.date,content.date)""", - self.insert_cache["content"].items(), + self.insert_cache["content"].items() ) if self.insert_cache["content_early_in_rev"]: psycopg2.extras.execute_values( self.cursor, """INSERT INTO content_early_in_rev VALUES %s ON CONFLICT DO NOTHING""", - self.insert_cache["content_early_in_rev"], + self.insert_cache["content_early_in_rev"] ) if self.insert_cache["content_in_dir"]: psycopg2.extras.execute_values( self.cursor, """INSERT INTO content_in_dir VALUES %s ON CONFLICT DO NOTHING""", - self.insert_cache["content_in_dir"], + self.insert_cache["content_in_dir"] ) if self.insert_cache["directory"]: psycopg2.extras.execute_values( self.cursor, """INSERT INTO directory(id, date) VALUES %s ON CONFLICT (id) DO UPDATE SET date=LEAST(EXCLUDED.date,directory.date)""", - self.insert_cache["directory"].items(), + self.insert_cache["directory"].items() ) if self.insert_cache["directory_in_rev"]: psycopg2.extras.execute_values( self.cursor, """INSERT INTO directory_in_rev VALUES %s ON CONFLICT DO NOTHING""", - self.insert_cache["directory_in_rev"], + self.insert_cache["directory_in_rev"] ) if self.insert_cache["revision"]: psycopg2.extras.execute_values( self.cursor, """INSERT INTO revision(id, date) VALUES %s ON CONFLICT (id) DO UPDATE SET date=LEAST(EXCLUDED.date,revision.date)""", - self.insert_cache["revision"].items(), + self.insert_cache["revision"].items() ) if self.insert_cache["revision_before_rev"]: psycopg2.extras.execute_values( self.cursor, """INSERT INTO revision_before_rev VALUES %s ON CONFLICT DO NOTHING""", - self.insert_cache["revision_before_rev"], + self.insert_cache["revision_before_rev"] ) if self.insert_cache["revision_in_org"]: psycopg2.extras.execute_values( self.cursor, """INSERT INTO revision_in_org VALUES %s ON CONFLICT DO NOTHING""", - self.insert_cache["revision_in_org"], + self.insert_cache["revision_in_org"] ) def origin_get_id(self, origin: OriginEntry) -> int: if origin.id is None: # Check if current origin is already known and retrieve its internal id. self.cursor.execute("""SELECT id FROM origin WHERE url=%s""", (origin.url,)) row = self.cursor.fetchone() if row is None: # If the origin is seen for the first time, current revision is # the prefered one. self.cursor.execute( """INSERT INTO origin (url) VALUES (%s) RETURNING id""", - (origin.url,), + (origin.url,) ) return self.cursor.fetchone()[0] else: return row[0] else: return origin.id def revision_add(self, revision: RevisionEntry): # Add current revision to the compact DB self.insert_cache["revision"][revision.id] = revision.date def revision_add_before_revision( self, relative: RevisionEntry, revision: RevisionEntry ): self.insert_cache["revision_before_rev"].append((revision.id, relative.id)) def revision_add_to_origin(self, origin: OriginEntry, revision: RevisionEntry): self.insert_cache["revision_in_org"].append((revision.id, origin.id)) def revision_get_early_date(self, revision: RevisionEntry) -> Optional[datetime]: date = self.insert_cache["revision"].get(revision.id, None) if date is None: # If not, check whether it's been query before date = self.select_cache["revision"].get(revision.id, None) if date is None: # Otherwise, query the database and cache the value self.cursor.execute( """SELECT date FROM revision WHERE id=%s""", (revision.id,) ) row = self.cursor.fetchone() date = row[0] if row is not None else None self.select_cache["revision"][revision.id] = date return date def revision_get_prefered_origin(self, revision: RevisionEntry) -> int: # TODO: adapt this method to consider cached values self.cursor.execute( """SELECT COALESCE(org,0) FROM revision WHERE id=%s""", (revision.id,) ) row = self.cursor.fetchone() # None means revision is not in database # 0 means revision has no prefered origin return row[0] if row is not None and row[0] != 0 else None def revision_in_history(self, revision: RevisionEntry) -> bool: # TODO: adapt this method to consider cached values self.cursor.execute( """SELECT 1 FROM revision_before_rev WHERE prev=%s""", (revision.id,) ) return self.cursor.fetchone() is not None def revision_set_prefered_origin( self, origin: OriginEntry, revision: RevisionEntry ): # TODO: adapt this method to consider cached values self.cursor.execute( """UPDATE revision SET org=%s WHERE id=%s""", (origin.id, revision.id) ) def revision_visited(self, revision: RevisionEntry) -> bool: # TODO: adapt this method to consider cached values self.cursor.execute( """SELECT 1 FROM revision_in_org WHERE rev=%s""", (revision.id,) ) return self.cursor.fetchone() is not None diff --git a/swh/provenance/provenance.py b/swh/provenance/provenance.py index 43e4447..ec47d9e 100644 --- a/swh/provenance/provenance.py +++ b/swh/provenance/provenance.py @@ -1,425 +1,425 @@ import os from .archive import ArchiveInterface from .model import DirectoryEntry, FileEntry from .origin import OriginEntry from .revision import RevisionEntry from datetime import datetime from typing import Dict, List, Optional, Tuple # TODO: consider moving to path utils file together with normalize. def is_child(path: bytes, prefix: bytes) -> bool: return path != prefix and os.path.dirname(path) == prefix # def is_not_prefix(prefix: bytes, path: bytes) -> bool: # return not path.startswith(prefix) class ProvenanceInterface: def __init__(self, **kwargs): raise NotImplementedError def commit(self): raise NotImplementedError def content_add_to_directory( self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes ): raise NotImplementedError def content_add_to_revision( self, revision: RevisionEntry, blob: FileEntry, prefix: bytes ): raise NotImplementedError def content_find_first(self, blobid: bytes): raise NotImplementedError def content_find_all(self, blobid: bytes): raise NotImplementedError def content_get_early_date(self, blob: FileEntry) -> Optional[datetime]: raise NotImplementedError def content_get_early_dates(self, blobs: List[FileEntry]) -> Dict[bytes, datetime]: raise NotImplementedError def content_set_early_date(self, blob: FileEntry, date: datetime): raise NotImplementedError def directory_add_to_revision( self, revision: RevisionEntry, directory: DirectoryEntry, path: bytes ): raise NotImplementedError def directory_get_date_in_isochrone_frontier( self, directory: DirectoryEntry ) -> Optional[datetime]: raise NotImplementedError def directory_get_dates_in_isochrone_frontier( self, dirs: List[DirectoryEntry] ) -> Dict[bytes, datetime]: raise NotImplementedError def directory_invalidate_in_isochrone_frontier(self, directory: DirectoryEntry): raise NotImplementedError def directory_set_date_in_isochrone_frontier( self, directory: DirectoryEntry, date: datetime ): raise NotImplementedError def origin_get_id(self, origin: OriginEntry) -> int: raise NotImplementedError def revision_add(self, revision: RevisionEntry): raise NotImplementedError def revision_add_before_revision( self, relative: RevisionEntry, revision: RevisionEntry ): raise NotImplementedError def revision_add_to_origin(self, origin: OriginEntry, revision: RevisionEntry): raise NotImplementedError def revision_get_early_date(self, revision: RevisionEntry) -> Optional[datetime]: raise NotImplementedError def revision_get_prefered_origin(self, revision: RevisionEntry) -> int: raise NotImplementedError def revision_in_history(self, revision: RevisionEntry) -> bool: raise NotImplementedError def revision_set_prefered_origin( self, origin: OriginEntry, revision: RevisionEntry ): raise NotImplementedError def revision_visited(self, revision: RevisionEntry) -> bool: raise NotImplementedError def directory_process_content( provenance: ProvenanceInterface, directory: DirectoryEntry, relative: DirectoryEntry ): stack = [(directory, b"")] while stack: current, prefix = stack.pop() for child in iter(current): if isinstance(child, FileEntry): # Add content to the relative directory with the computed prefix. provenance.content_add_to_directory(relative, child, prefix) else: # Recursively walk the child directory. stack.append((child, os.path.join(prefix, child.name))) def directory_update_content( stack: List[Tuple[DirectoryEntry, bytes]], provenance: ProvenanceInterface, revision: RevisionEntry, directory: DirectoryEntry, prefix: bytes, subdirs: Optional[List[DirectoryEntry]] = None, blobs: Optional[List[FileEntry]] = None, - blobdates: Optional[Dict[bytes, datetime]] = None, + blobdates: Optional[Dict[bytes, datetime]] = None ): assert revision.date is not None # Init optional parameters if not provided. if subdirs is None: subdirs = [child for child in directory if isinstance(child, DirectoryEntry)] if blobs is None: blobs = [child for child in directory if isinstance(child, FileEntry)] if blobdates is None: blobdates = provenance.content_get_early_dates(blobs) # Iterate over blobs updating their date if necessary. for blob in blobs: date = blobdates.get(blob.id, None) if date is None or revision.date < date: provenance.content_set_early_date(blob, revision.date) # Push all subdirectories with its corresponding path to analyze them # recursively. for subdir in subdirs: stack.append((subdir, os.path.join(prefix, subdir.name))) def origin_add(provenance: ProvenanceInterface, origin: OriginEntry): # TODO: refactor to iterate over origin visit statuses and commit only once # per status. origin.id = provenance.origin_get_id(origin) for revision in origin.revisions: origin_add_revision(provenance, origin, revision) # Commit after each revision provenance.commit() # TODO: verify this! def origin_add_revision( provenance: ProvenanceInterface, origin: OriginEntry, revision: RevisionEntry ): stack: List[Tuple[Optional[RevisionEntry], RevisionEntry]] = [(None, revision)] while stack: relative, current = stack.pop() # Check if current revision has no prefered origin and update if necessary. prefered = provenance.revision_get_prefered_origin(current) if prefered is None: provenance.revision_set_prefered_origin(origin, current) ######################################################################## if relative is None: # This revision is pointed directly by the origin. visited = provenance.revision_visited(current) provenance.revision_add_to_origin(origin, current) if not visited: stack.append((current, current)) else: # This revision is a parent of another one in the history of the # relative revision. for parent in iter(current): visited = provenance.revision_visited(parent) if not visited: # The parent revision has never been seen before pointing # directly to an origin. known = provenance.revision_in_history(parent) if known: # The parent revision is already known in some other # revision's history. We should point it directly to # the origin and (eventually) walk its history. stack.append((None, parent)) else: # The parent revision was never seen before. We should # walk its history and associate it with the same # relative revision. provenance.revision_add_before_revision(relative, parent) stack.append((relative, parent)) else: # The parent revision already points to an origin, so its # history was properly processed before. We just need to # make sure it points to the current origin as well. provenance.revision_add_to_origin(origin, parent) def revision_add( provenance: ProvenanceInterface, archive: ArchiveInterface, revision: RevisionEntry ): assert revision.date is not None assert revision.root is not None # Processed content starting from the revision's root directory date = provenance.revision_get_early_date(revision) if date is None or revision.date < date: provenance.revision_add(revision) revision_process_content( provenance, revision, DirectoryEntry(archive, revision.root, b"") ) return provenance.commit() def revision_process_content( provenance: ProvenanceInterface, revision: RevisionEntry, root: DirectoryEntry ): assert revision.date is not None # Stack of directories (and their paths) to be processed. stack: List[Tuple[DirectoryEntry, bytes]] = [(root, root.name)] # This dictionary will hold the computed dates for visited subdirectories inside the # isochrone frontier. innerdirs: Dict[bytes, Tuple[DirectoryEntry, datetime]] = {} # This dictionary will hold the computed dates for visited subdirectories outside # the isochrone frontier which are candidates to be added to the outer frontier (if # their parent is in the inner frontier). outerdirs: Dict[bytes, Tuple[DirectoryEntry, datetime]] = {} while stack: # Get next directory to process and query its date right before processing to be # sure we get the most recently updated value. current, prefix = stack.pop() date = provenance.directory_get_date_in_isochrone_frontier(current) if date is None: # The directory has never been seen on the outer isochrone frontier of # previously processed revisions. Its children should be analyzed. blobs = [child for child in current if isinstance(child, FileEntry)] subdirs = [child for child in current if isinstance(child, DirectoryEntry)] # Get the list of ids with no duplicates to ensure we have available dates # for all the elements. This prevents taking a wrong decision when a blob # occurs more than once in the same directory. ids = list(dict.fromkeys([child.id for child in blobs + subdirs])) if ids: # Known dates for the blobs in the current directory. blobdates = provenance.content_get_early_dates(blobs) # Known dates for the subdirectories in the current directory that # belong to the outer isochrone frontier of some previously processed # revision. knowndates = provenance.directory_get_dates_in_isochrone_frontier( subdirs ) # Known dates for the subdirectories in the current directory that are # inside the isochrone frontier of the revision. innerdates = { innerdir.id: innerdate for path, (innerdir, innerdate) in innerdirs.items() if is_child(path, prefix) } # Known dates for the subdirectories in the current directory that are # outside the isochrone frontier of the revision. outerdates = { outerdir.id: outerdate for path, (outerdir, outerdate) in outerdirs.items() if is_child(path, prefix) } # All known dates for child nodes of the current directory. assert not (innerdates.keys() & outerdates.keys()) dates = list( {**blobdates, **knowndates, **innerdates, **outerdates}.values() ) if len(dates) == len(ids): # All child nodes of current directory are already known. maxdate = max(dates) if maxdate < revision.date: # The directory is outside the isochrone frontier of the # revision. It is a candidate to be added to the outer frontier. outerdirs[prefix] = (current, maxdate) # Its children are removed since they are no longer candidates. outerdirs = { path: outerdir for path, outerdir in outerdirs.items() if not is_child(path, prefix) } elif maxdate == revision.date: # The current directory is inside the isochrone frontier. innerdirs[prefix] = (current, revision.date) # Add blobs present in this level to the revision. No need to # update dates as they are at most equal to current one. for blob in blobs: provenance.content_add_to_revision(revision, blob, prefix) # If any of its children was found outside the frontier it # should be added to the outer frontier now. if outerdates: for path, (outerdir, outerdate) in outerdirs.items(): if is_child(path, prefix): provenance.directory_set_date_in_isochrone_frontier( outerdir, outerdate ) provenance.directory_add_to_revision( revision, outerdir, path ) directory_process_content( provenance, directory=outerdir, relative=outerdir ) # Removed processed elements to avoid duplicating work. outerdirs = { path: outerdir for path, outerdir in outerdirs.items() if not is_child(path, prefix) } # There can still be subdirectories that are known to be in the # outter isochrone frontier of previous processed revisions. # Thus, they are not in the list of candidates but have to be # added to current revisions as well. for subdir in subdirs: knowndate = knowndates.get(subdir.id, None) if knowndate is not None and knowndate <= revision.date: # Less or equal since the directory could have been # added to the outer isochrone frontier when processing # a different directory's subtree of this very same # revision. provenance.directory_add_to_revision( revision, subdir, os.path.join(prefix, subdir.name) ) else: # The revision is out of order. The current directory does not # belong to the outer isochrone frontier of any previously # processed revision yet all its children nodes are known. They # should be re-analyzed (and timestamps eventually updated) and # current directory updated after them. stack.append((current, prefix)) directory_update_content( stack, provenance, revision, current, prefix, subdirs=subdirs, blobs=blobs, - blobdates=blobdates, + blobdates=blobdates ) else: # Al least one child node is unknown, ie. the current directory is # inside the isochrone frontier of the current revision. Its child # nodes should be analyzed and current directory updated after them. stack.append((current, prefix)) directory_update_content( stack, provenance, revision, current, prefix, subdirs=subdirs, blobs=blobs, - blobdates=blobdates, + blobdates=blobdates ) else: # Empty directory. Just consider it to be in the inner frontier of # current revision (ie. all its children are already "known"). innerdirs[prefix] = (current, revision.date) elif revision.date < date: # The revision is out of order. The current directory belongs to the outer # isochrone frontier of some previously processed revison but current # revision is earlier. The frontier record should be invalidated, children # nodes re-analyzed (and timestamps eventually updated) and current # directory updated after them. stack.append((current, prefix)) provenance.directory_invalidate_in_isochrone_frontier(current) directory_update_content(stack, provenance, revision, current, prefix) else: # The directory has already been seen on the outer isochrone frontier of an # earlier revision. Just add it to the current revision. provenance.directory_add_to_revision(revision, current, prefix) if outerdirs: # This should only happen if the root directory is in the outer frontier. if not (len(outerdirs) == 1 and root.name in outerdirs): print(outerdirs) assert len(outerdirs) == 1 and root.name in outerdirs outerdir, outerdate = outerdirs[root.name] provenance.directory_set_date_in_isochrone_frontier(outerdir, outerdate) provenance.directory_add_to_revision(revision, outerdir, root.name) directory_process_content(provenance, directory=outerdir, relative=outerdir) diff --git a/swh/provenance/revision.py b/swh/provenance/revision.py index 8fdfb5b..04924b7 100644 --- a/swh/provenance/revision.py +++ b/swh/provenance/revision.py @@ -1,184 +1,184 @@ import threading from .archive import ArchiveInterface from datetime import datetime from typing import Optional from swh.model.hashutil import hash_to_bytes class RevisionEntry: def __init__( self, archive: ArchiveInterface, id: bytes, date: Optional[datetime] = None, root: Optional[bytes] = None, - parents: Optional[list] = None, + parents: Optional[list] = None ): self.archive = archive self.id = id self.date = date self.parents = parents self.root = root def __iter__(self): if self.parents is None: self.parents = [] for parent in self.archive.revision_get([self.id]): if parent is not None: self.parents.append( RevisionEntry( self.archive, parent.id, parents=[ RevisionEntry(self.archive, id) for id in parent.parents - ], + ] ) ) return iter(self.parents) ######################################################################################## ######################################################################################## class RevisionIterator: """Iterator interface.""" def __iter__(self): pass def __next__(self): pass class FileRevisionIterator(RevisionIterator): """Iterator over revisions present in the given CSV file.""" def __init__( self, filename: str, archive: ArchiveInterface, limit: Optional[int] = None ): self.file = open(filename) self.idx = 0 self.limit = limit self.mutex = threading.Lock() self.archive = archive def next(self): self.mutex.acquire() line = self.file.readline().strip() if line and (self.limit is None or self.idx < self.limit): self.idx = self.idx + 1 id, date, root = line.strip().split(",") self.mutex.release() return RevisionEntry( self.archive, hash_to_bytes(id), date=datetime.fromisoformat(date), - root=hash_to_bytes(root), + root=hash_to_bytes(root) ) else: self.mutex.release() return None # class ArchiveRevisionIterator(RevisionIterator): # """Iterator over revisions present in the given database.""" # # def __init__(self, conn, limit=None, chunksize=100): # self.cur = conn.cursor() # self.chunksize = chunksize # self.records = [] # if limit is None: # self.cur.execute('''SELECT id, date, committer_date, directory # FROM revision''') # else: # self.cur.execute('''SELECT id, date, committer_date, directory # FROM revision # LIMIT %s''', (limit,)) # for row in self.cur.fetchmany(self.chunksize): # record = self.make_record(row) # if record is not None: # self.records.append(record) # self.mutex = threading.Lock() # # def __del__(self): # self.cur.close() # # def next(self): # self.mutex.acquire() # if not self.records: # self.records.clear() # for row in self.cur.fetchmany(self.chunksize): # record = self.make_record(row) # if record is not None: # self.records.append(record) # # if self.records: # revision, *self.records = self.records # self.mutex.release() # return revision # else: # self.mutex.release() # return None # # def make_record(self, row): # # Only revision with author or commiter date are considered # if row[1] is not None: # # If the revision has author date, it takes precedence # return RevisionEntry(row[0], row[1], row[3]) # elif row[2] is not None: # # If not, we use the commiter date # return RevisionEntry(row[0], row[2], row[3]) ######################################################################################## ######################################################################################## # class RevisionWorker(threading.Thread): # def __init__( # self, # id: int, # conninfo: dict, # archive: ArchiveInterface, # revisions: RevisionIterator # ): # from .provenance import get_provenance # # super().__init__() # self.archive = archive # self.id = id # self.provenance = get_provenance(conninfo) # self.revisions = revisions # # # def run(self): # from .provenance import revision_add # # # while True: # revision = self.revisions.next() # if revision is None: break # # processed = False # while not processed: # logging.info( # f'Thread {( # self.id # )} - Processing revision {( # hash_to_hex(revision.id) # )} (timestamp: {revision.date})' # ) # processed = revision_add(self.provenance, self.archive, revision) # if not processed: # logging.warning( # f'Thread {( # self.id # )} - Failed to process revision {( # hash_to_hex(revision.id) # )} (timestamp: {revision.date})' # )