diff --git a/.gitignore b/.gitignore index d3a8e3a..d01f7be 100644 --- a/.gitignore +++ b/.gitignore @@ -1,15 +1,16 @@ *.csv *.egg* *.log *.prof *.zip +.tox .vscode __pycache__ compact.egg.info content-revision build data debian dumps PKG-INFO revision-origin diff --git a/contents.py b/contents.py index 5f5b5da..2cd125c 100644 --- a/contents.py +++ b/contents.py @@ -1,52 +1,57 @@ import os -import psycopg2 -from swh.model.hashutil import (hash_to_bytes, hash_to_hex) +from swh.model.hashutil import hash_to_hex from swh.provenance.provenance import get_provenance if __name__ == "__main__": conninfo = { "host": "localhost", "database": "new_1000", "user": "postgres", - "password": "postgres" + "password": "postgres", } provenance = get_provenance(conninfo) - print('content(id, date): ################################################') - provenance.cursor.execute('''SELECT id, date FROM content ORDER BY id''') + print("content(id, date): ################################################") + provenance.cursor.execute("""SELECT id, date FROM content ORDER BY id""") for row in provenance.cursor.fetchall(): - print(f'{hash_to_hex(row[0])}, {row[1]}') - print('###################################################################') + print(f"{hash_to_hex(row[0])}, {row[1]}") + print("###################################################################") - print('content_early_in_rev(blob, rev, path): ############################') - provenance.cursor.execute('''SELECT blob, rev, path FROM content_early_in_rev ORDER BY blob, rev, path''') + print("content_early_in_rev(blob, rev, path): ############################") + provenance.cursor.execute( + """SELECT blob, rev, path FROM content_early_in_rev ORDER BY blob, rev, path""" + ) for row in provenance.cursor.fetchall(): - print(f'{row[0]}, {row[1]}, {row[2]}') - print(f'{hash_to_hex(row[0])}, {hash_to_hex(row[1])}, {os.fsdecode(row[2])}') - print('###################################################################') - - print('content_in_dir(blob, dir, path): ##################################') - provenance.cursor.execute('''SELECT blob, dir, path FROM content_in_dir ORDER BY blob, dir, path''') + print(f"{row[0]}, {row[1]}, {row[2]}") + print(f"{hash_to_hex(row[0])}, {hash_to_hex(row[1])}, {os.fsdecode(row[2])}") + print("###################################################################") + + print("content_in_dir(blob, dir, path): ##################################") + provenance.cursor.execute( + """SELECT blob, dir, path FROM content_in_dir ORDER BY blob, dir, path""" + ) for row in provenance.cursor.fetchall(): - print(f'{hash_to_hex(row[0])}, {hash_to_hex(row[1])}, {os.fsdecode(row[2])}') - print('###################################################################') + print(f"{hash_to_hex(row[0])}, {hash_to_hex(row[1])}, {os.fsdecode(row[2])}") + print("###################################################################") - print('directory(id, date): ##############################################') - provenance.cursor.execute('''SELECT id, date FROM directory ORDER BY id''') + print("directory(id, date): ##############################################") + provenance.cursor.execute("""SELECT id, date FROM directory ORDER BY id""") for row in provenance.cursor.fetchall(): - print(f'{hash_to_hex(row[0])}, {row[1]}') - print('###################################################################') + print(f"{hash_to_hex(row[0])}, {row[1]}") + print("###################################################################") - print('directory_in_rev(dir, rev, path): #################################') - provenance.cursor.execute('''SELECT dir, rev, path FROM directory_in_rev ORDER BY dir, rev, path''') + print("directory_in_rev(dir, rev, path): #################################") + provenance.cursor.execute( + """SELECT dir, rev, path FROM directory_in_rev ORDER BY dir, rev, path""" + ) for row in provenance.cursor.fetchall(): - print(f'{hash_to_hex(row[0])}, {hash_to_hex(row[1])}, {os.fsdecode(row[2])}') - print('###################################################################') + print(f"{hash_to_hex(row[0])}, {hash_to_hex(row[1])}, {os.fsdecode(row[2])}") + print("###################################################################") - print('revision(id, date): ###############################################') - provenance.cursor.execute('''SELECT id, date FROM revision ORDER BY id''') + print("revision(id, date): ###############################################") + provenance.cursor.execute("""SELECT id, date FROM revision ORDER BY id""") for row in provenance.cursor.fetchall(): - print(f'{hash_to_hex(row[0])}, {row[1]}') - print('###################################################################') + print(f"{hash_to_hex(row[0])}, {row[1]}") + print("###################################################################") diff --git a/mypy.ini b/mypy.ini index 46f8db8..5bf1e3b 100644 --- a/mypy.ini +++ b/mypy.ini @@ -1,15 +1,18 @@ [mypy] namespace_packages = True warn_unused_ignores = True # 3rd party libraries without stubs (yet) [mypy-pkg_resources.*] ignore_missing_imports = True [mypy-pytest.*] ignore_missing_imports = True -# [mypy-add_your_lib_here.*] -# ignore_missing_imports = True +[mypy-psycopg2.*] +ignore_missing_imports = True + +[mypy-methodtools.*] +ignore_missing_imports = True diff --git a/requirements-swh.txt b/requirements-swh.txt index 24f0a5c..e4b2e83 100644 --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,2 +1,4 @@ # Add here internal Software Heritage dependencies, one per line. swh.core +swh.model +swh.storage \ No newline at end of file diff --git a/revisions.py b/revisions.py index eff0fcf..ccadf0e 100644 --- a/revisions.py +++ b/revisions.py @@ -1,62 +1,69 @@ import io import random import pytz from datetime import datetime -from swh.model.hashutil import (hash_to_bytes, hash_to_hex) +from swh.model.hashutil import hash_to_bytes, hash_to_hex from swh.storage import get_storage from swh.provenance.revision import RevisionEntry def rev_to_csv(revision: RevisionEntry): - return ','.join([ - hash_to_hex(revision.id), - str(pytz.utc.localize(revision.date)), - hash_to_hex(revision.root) - ]) + '\n' + return ( + ",".join( + [ + hash_to_hex(revision.id), + str(pytz.utc.localize(revision.date)), + hash_to_hex(revision.root), + ] + ) + + "\n" + ) if __name__ == "__main__": conninfo = { "cls": "remote", - "url": "http://uffizi.internal.softwareheritage.org:5002" + "url": "http://uffizi.internal.softwareheritage.org:5002", } storage = get_storage(**conninfo) revisions = [ # '6eec5815ef8fc88d9fc5bcc91c6465a8899c1445', # 'd1468bb5f06ca44cc42c43fbd011c5dcbdc262c6', # '6a45ebb887d87ee53f359aaeba8a9840576c907b' - '02f95c0a1868cbef82ff73fc1b903183a579c7de', - 'da061f1caf293a5da00bff6a45abcf4d7ae54c50', - 'e3bfd73a9fd8ef3dd4c5b05a927de485f9871323' + "02f95c0a1868cbef82ff73fc1b903183a579c7de", + "da061f1caf293a5da00bff6a45abcf4d7ae54c50", + "e3bfd73a9fd8ef3dd4c5b05a927de485f9871323", ] print(revisions) revisions = list(map(hash_to_bytes, revisions)) print(revisions) entries = [] for revision in storage.revision_get(revisions): if revision is not None: print(revision) - entries.append(RevisionEntry( - storage, - revision.id, - datetime.fromtimestamp(revision.date.timestamp.seconds), - revision.directory - )) + entries.append( + RevisionEntry( + storage, + revision.id, + datetime.fromtimestamp(revision.date.timestamp.seconds), + revision.directory, + ) + ) random.shuffle(entries) - with io.open('random.csv', 'w') as outfile: + with io.open("random.csv", "w") as outfile: for revision in entries: outfile.write(rev_to_csv(revision)) - with io.open('ordered.csv', 'w') as outfile: + with io.open("ordered.csv", "w") as outfile: for revision in sorted(entries, key=lambda rev: rev.date): outfile.write(rev_to_csv(revision)) - with io.open('reverse.csv', 'w') as outfile: + with io.open("reverse.csv", "w") as outfile: for revision in sorted(entries, key=lambda rev: rev.date, reverse=True): outfile.write(rev_to_csv(revision)) diff --git a/swh/provenance/cli.py b/swh/provenance/cli.py index 5fca354..2f2d8dd 100644 --- a/swh/provenance/cli.py +++ b/swh/provenance/cli.py @@ -1,186 +1,197 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information # WARNING: do not import unnecessary things here to keep cli startup time under # control import os from typing import Any, Dict, Optional import click import yaml from swh.core import config from swh.core.cli import CONTEXT_SETTINGS from swh.core.cli import swh as swh_cli_group -from swh.model.hashutil import (hash_to_bytes, hash_to_hex) +from swh.model.hashutil import hash_to_bytes, hash_to_hex # All generic config code should reside in swh.core.config CONFIG_ENVVAR = "SWH_CONFIG_FILE" DEFAULT_CONFIG_PATH = os.path.join(click.get_app_dir("swh"), "global.yml") DEFAULT_PATH = os.environ.get(CONFIG_ENVVAR, DEFAULT_CONFIG_PATH) DEFAULT_CONFIG: Dict[str, Any] = { "archive": { "cls": "api", "storage": { "cls": "remote", - "url": "http://uffizi.internal.softwareheritage.org:5002" + "url": "http://uffizi.internal.softwareheritage.org:5002", } # "cls": "ps", # "db": { # "host": "db.internal.softwareheritage.org", # "dbname": "softwareheritage", # "user": "guest" # } }, - "provenance": { - "cls": "ps", - "db": { - "host": "localhost", - "dbname": "provenance" - } - } + "provenance": {"cls": "ps", "db": {"host": "localhost", "dbname": "provenance"}}, } CONFIG_FILE_HELP = f"""Configuration file: \b The CLI option or the environment variable will fail if invalid. CLI option is checked first. Then, environment variable {CONFIG_ENVVAR} is checked. Then, if cannot load the default path, a set of default values are used. Default config path is {DEFAULT_CONFIG_PATH}. Default config values are: \b {yaml.dump(DEFAULT_CONFIG)}""" PROVENANCE_HELP = f"""Software Heritage Scanner tools. {CONFIG_FILE_HELP}""" @swh_cli_group.group( - name="provenance", context_settings=CONTEXT_SETTINGS, help=PROVENANCE_HELP, + name="provenance", + context_settings=CONTEXT_SETTINGS, + help=PROVENANCE_HELP, ) @click.option( "-C", "--config-file", default=None, type=click.Path(exists=False, dir_okay=False, path_type=str), help="""YAML configuration file""", ) @click.option("--profile", default=None) @click.pass_context def cli(ctx, config_file: Optional[str], profile: str): if config_file is None and config.config_exists(DEFAULT_PATH): config_file = DEFAULT_PATH if config_file is None: conf = DEFAULT_CONFIG else: # read_raw_config do not fail on ENOENT if not config.config_exists(config_file): raise FileNotFoundError(config_file) conf = config.read_raw_config(config.config_basepath(config_file)) conf = config.merge_configs(DEFAULT_CONFIG, conf) ctx.ensure_object(dict) ctx.obj["config"] = conf if profile: import cProfile import atexit print("Profiling...") pr = cProfile.Profile() pr.enable() def exit(): pr.disable() pr.dump_stats(profile) atexit.register(exit) @cli.command(name="create") @click.option("--name", default=None) @click.pass_context def create(ctx, name): """Create new provenance database.""" from .postgresql.db_utils import connect from .postgresql.provenance import create_database # Connect to server without selecting a database conninfo = ctx.obj["config"]["provenance"]["db"] conn = connect(conninfo) create_database(conn, conninfo, name) @cli.command(name="iter-revisions") @click.argument("filename") -@click.option('-l', '--limit', type=int) +@click.option("-l", "--limit", type=int) @click.pass_context def iter_revisions(ctx, filename, limit): - """Iterate over provided list of revisions and add them to the provenance database.""" + """Process a provided list of revisions.""" from . import get_archive, get_provenance from .revision import FileRevisionIterator from .provenance import revision_add archive = get_archive(**ctx.obj["config"]["archive"]) provenance = get_provenance(**ctx.obj["config"]["provenance"]) revisions = FileRevisionIterator(filename, archive, limit=limit) while True: revision = revisions.next() - if revision is None: break + if revision is None: + break revision_add(provenance, archive, revision) @cli.command(name="iter-origins") @click.argument("filename") -@click.option('-l', '--limit', type=int) +@click.option("-l", "--limit", type=int) @click.pass_context def iter_origins(ctx, filename, limit): - """Iterate over provided list of revisions and add them to the provenance database.""" + """Process a provided list of origins.""" from . import get_archive, get_provenance from .origin import FileOriginIterator from .provenance import origin_add archive = get_archive(**ctx.obj["config"]["archive"]) provenance = get_provenance(**ctx.obj["config"]["provenance"]) for origin in FileOriginIterator(filename, archive, limit=limit): origin_add(provenance, origin) @cli.command(name="find-first") @click.argument("swhid") @click.pass_context def find_first(ctx, swhid): """Find first occurrence of the requested blob.""" from . import get_provenance provenance = get_provenance(**ctx.obj["config"]["provenance"]) # TODO: return a dictionary with proper keys for each field row = provenance.content_find_first(hash_to_bytes(swhid)) if row is not None: - print(f'{hash_to_hex(row[0])}, {hash_to_hex(row[1])}, {row[2]}, {os.fsdecode(row[3])}') + print( + "{blob}, {rev}, {date}, {path}".format( + blob=hash_to_hex(row[0]), + rev=hash_to_hex(row[1]), + date=row[2], + path=os.fsdecode(row[3]), + ) + ) else: - print(f'Cannot find a content with the id {swhid}') + print(f"Cannot find a content with the id {swhid}") @cli.command(name="find-all") @click.argument("swhid") @click.pass_context def find_all(ctx, swhid): """Find all occurrences of the requested blob.""" from swh.provenance import get_provenance provenance = get_provenance(**ctx.obj["config"]["provenance"]) # TODO: return a dictionary with proper keys for each field for row in provenance.content_find_all(hash_to_bytes(swhid)): - print(f'{hash_to_hex(row[0])}, {hash_to_hex(row[1])}, {row[2]}, {os.fsdecode(row[3])}') + print( + "{blob}, {rev}, {date}, {path}".format( + blob=hash_to_hex(row[0]), + rev=hash_to_hex(row[1]), + date=row[2], + path=os.fsdecode(row[3]), + ) + ) diff --git a/swh/provenance/model.py b/swh/provenance/model.py index 4889b18..a04d8e8 100644 --- a/swh/provenance/model.py +++ b/swh/provenance/model.py @@ -1,46 +1,49 @@ import os from .archive import ArchiveInterface from pathlib import PosixPath # class Tree: # def __init__(self, archive: ArchiveInterface, id: bytes): # self.root = DirectoryEntry(archive, id, PosixPath('.')) class TreeEntry: def __init__(self, id: bytes, name: PosixPath): self.id = id self.name = name class DirectoryEntry(TreeEntry): def __init__(self, archive: ArchiveInterface, id: bytes, name: PosixPath): super().__init__(id, name) self.archive = archive self.children = None def __iter__(self): if self.children is None: self.children = [] for child in self.archive.directory_ls(self.id): - if child['type'] == 'dir': - self.children.append(DirectoryEntry( - self.archive, - child['target'], - PosixPath(os.fsdecode(child['name'])) - )) - - elif child['type'] == 'file': - self.children.append(FileEntry( - child['target'], - PosixPath(os.fsdecode(child['name'])) - )) + if child["type"] == "dir": + self.children.append( + DirectoryEntry( + self.archive, + child["target"], + PosixPath(os.fsdecode(child["name"])), + ) + ) + + elif child["type"] == "file": + self.children.append( + FileEntry( + child["target"], PosixPath(os.fsdecode(child["name"])) + ) + ) return iter(self.children) class FileEntry(TreeEntry): pass diff --git a/swh/provenance/origin.py b/swh/provenance/origin.py index 3bad168..d54d79d 100644 --- a/swh/provenance/origin.py +++ b/swh/provenance/origin.py @@ -1,98 +1,111 @@ from .archive import ArchiveInterface from .revision import RevisionEntry +from typing import Optional + from swh.model.model import Origin, ObjectType, TargetType class OriginEntry: def __init__(self, url, revisions, id=None): self.id = id self.url = url self.revisions = revisions ################################################################################ ################################################################################ + class OriginIterator: """Iterator interface.""" def __iter__(self): pass def __next__(self): pass class FileOriginIterator(OriginIterator): """Iterator over origins present in the given CSV file.""" - def __init__(self, filename: str, archive: ArchiveInterface, limit: int=None): + def __init__( + self, filename: str, archive: ArchiveInterface, limit: Optional[int] = None + ): self.file = open(filename) self.limit = limit # self.mutex = threading.Lock() self.archive = archive def __iter__(self): yield from iterate_statuses( - [Origin(url.strip()) for url in self.file], - self.archive, - self.limit + [Origin(url.strip()) for url in self.file], self.archive, self.limit ) class ArchiveOriginIterator: """Iterator over origins present in the given storage.""" - def __init__(self, archive: ArchiveInterface, limit: int=None): + def __init__(self, archive: ArchiveInterface, limit: Optional[int] = None): self.limit = limit # self.mutex = threading.Lock() self.archive = archive def __iter__(self): yield from iterate_statuses( - self.archive.iter_origins(), - self.archive, - self.limit + self.archive.iter_origins(), self.archive, self.limit ) -def iterate_statuses(origins, archive: ArchiveInterface, limit: int=None): +def iterate_statuses(origins, archive: ArchiveInterface, limit: Optional[int] = None): idx = 0 for origin in origins: for visit in archive.iter_origin_visits(origin.url): for status in archive.iter_origin_visit_statuses(origin.url, visit.visit): # TODO: may filter only those whose status is 'full'?? targets = [] releases = [] snapshot = archive.snapshot_get_all_branches(status.snapshot) if snapshot is not None: for branch in snapshot.branches: if snapshot.branches[branch].target_type == TargetType.REVISION: targets.append(snapshot.branches[branch].target) - elif snapshot.branches[branch].target_type == TargetType.RELEASE: + elif ( + snapshot.branches[branch].target_type == TargetType.RELEASE + ): releases.append(snapshot.branches[branch].target) - # This is done to keep the query in release_get small, hence avoiding a timeout. + # This is done to keep the query in release_get small, hence avoiding + # a timeout. limit = 100 for i in range(0, len(releases), limit): - for release in archive.release_get(releases[i:i+limit]): - if revision is not None: + for release in archive.release_get(releases[i : i + limit]): + if release is not None: if release.target_type == ObjectType.REVISION: targets.append(release.target) - # This is done to keep the query in revision_get small, hence avoiding a timeout. + # This is done to keep the query in revision_get small, hence avoiding + # a timeout. revisions = [] limit = 100 for i in range(0, len(targets), limit): - for revision in archive.revision_get(targets[i:i+limit]): + for revision in archive.revision_get(targets[i : i + limit]): if revision is not None: - parents = list(map(lambda id: RevisionEntry(archive, id), revision.parents)) - revisions.append(RevisionEntry(archive, revision.id, parents=parents)) + parents = list( + map( + lambda id: RevisionEntry(archive, id), + revision.parents, + ) + ) + revisions.append( + RevisionEntry(archive, revision.id, parents=parents) + ) yield OriginEntry(status.origin, revisions) idx = idx + 1 - if idx == limit: return + if idx == limit: + return diff --git a/swh/provenance/postgresql/archive.py b/swh/provenance/postgresql/archive.py index 25b1f18..72734d4 100644 --- a/swh/provenance/postgresql/archive.py +++ b/swh/provenance/postgresql/archive.py @@ -1,65 +1,73 @@ import psycopg2 from ..archive import ArchiveInterface # from functools import lru_cache from methodtools import lru_cache from typing import List class ArchivePostgreSQL(ArchiveInterface): def __init__(self, conn: psycopg2.extensions.connection): self.conn = conn self.cursor = conn.cursor() @lru_cache(maxsize=1000000) def directory_ls(self, id: bytes): - self.cursor.execute('''WITH + self.cursor.execute( + """WITH dir AS (SELECT id AS dir_id, dir_entries, file_entries, rev_entries FROM directory WHERE id=%s), ls_d AS (SELECT dir_id, unnest(dir_entries) AS entry_id from dir), ls_f AS (SELECT dir_id, unnest(file_entries) AS entry_id from dir), ls_r AS (SELECT dir_id, unnest(rev_entries) AS entry_id from dir) - (SELECT 'dir'::directory_entry_type AS type, e.target, e.name, NULL::sha1_git + (SELECT 'dir'::directory_entry_type AS type, e.target, e.name, + NULL::sha1_git FROM ls_d LEFT JOIN directory_entry_dir e ON ls_d.entry_id=e.id) UNION (WITH known_contents AS - (SELECT 'file'::directory_entry_type AS type, e.target, e.name, c.sha1_git - FROM ls_f - LEFT JOIN directory_entry_file e ON ls_f.entry_id=e.id - INNER JOIN content c ON e.target=c.sha1_git) - SELECT * FROM known_contents - UNION - (SELECT 'file'::directory_entry_type AS type, e.target, e.name, c.sha1_git - FROM ls_f - LEFT JOIN directory_entry_file e ON ls_f.entry_id=e.id - LEFT JOIN skipped_content c ON e.target=c.sha1_git - WHERE NOT EXISTS (SELECT 1 FROM known_contents WHERE known_contents.sha1_git=e.target))) + (SELECT 'file'::directory_entry_type AS type, e.target, e.name, + c.sha1_git + FROM ls_f + LEFT JOIN directory_entry_file e ON ls_f.entry_id=e.id + INNER JOIN content c ON e.target=c.sha1_git) + SELECT * FROM known_contents + UNION + (SELECT 'file'::directory_entry_type AS type, e.target, e.name, + c.sha1_git + FROM ls_f + LEFT JOIN directory_entry_file e ON ls_f.entry_id=e.id + LEFT JOIN skipped_content c ON e.target=c.sha1_git + WHERE NOT EXISTS ( + SELECT 1 FROM known_contents + WHERE known_contents.sha1_git=e.target + ) + ) + ) ORDER BY name - ''', (id,)) - return [{'type': row[0], 'target': row[1], 'name': row[2]} for row in self.cursor.fetchall()] - + """, + (id,), + ) + return [ + {"type": row[0], "target": row[1], "name": row[2]} + for row in self.cursor.fetchall() + ] def iter_origins(self): raise NotImplementedError - def iter_origin_visits(self, origin: str): raise NotImplementedError - def iter_origin_visit_statuses(self, origin: str, visit: int): raise NotImplementedError - def release_get(self, ids: List[bytes]): raise NotImplementedError - def revision_get(self, ids: List[bytes]): raise NotImplementedError - def snapshot_get_all_branches(self, snapshot: bytes): raise NotImplementedError diff --git a/swh/provenance/postgresql/db_utils.py b/swh/provenance/postgresql/db_utils.py index 003f52b..483f6b1 100644 --- a/swh/provenance/postgresql/db_utils.py +++ b/swh/provenance/postgresql/db_utils.py @@ -1,62 +1,62 @@ import io import psycopg2 from configparser import ConfigParser from pathlib import PosixPath def config(filename: PosixPath, section: str): # create a parser parser = ConfigParser() # read config file parser.read(filename) # get section, default to postgresql db = {} if parser.has_section(section): params = parser.items(section) for param in params: db[param[0]] = param[1] else: - raise Exception(f'Section {section} not found in the {filename} file') + raise Exception(f"Section {section} not found in the {filename} file") return db def typecast_bytea(value, cur): if value is not None: data = psycopg2.BINARY(value, cur) return data.tobytes() def adapt_conn(conn): """Makes psycopg2 use 'bytes' to decode bytea instead of 'memoryview', for this connection.""" t_bytes = psycopg2.extensions.new_type((17,), "bytea", typecast_bytea) psycopg2.extensions.register_type(t_bytes, conn) t_bytes_array = psycopg2.extensions.new_array_type((1001,), "bytea[]", t_bytes) psycopg2.extensions.register_type(t_bytes_array, conn) def connect(params: dict): """ Connect to the PostgreSQL database server """ conn = None try: # connect to the PostgreSQL server conn = psycopg2.connect(**params) adapt_conn(conn) except (Exception, psycopg2.DatabaseError) as error: print(error) return conn def execute_sql(conn: psycopg2.extensions.connection, filename: PosixPath): with io.open(filename) as file: cur = conn.cursor() cur.execute(file.read()) cur.close() conn.commit() diff --git a/swh/provenance/postgresql/provenance.py b/swh/provenance/postgresql/provenance.py index 0005fa8..9a4fe0b 100644 --- a/swh/provenance/postgresql/provenance.py +++ b/swh/provenance/postgresql/provenance.py @@ -1,435 +1,407 @@ import itertools import logging import os import psycopg2 import psycopg2.extras from ..model import DirectoryEntry, FileEntry from ..origin import OriginEntry from .db_utils import connect, execute_sql from ..provenance import ProvenanceInterface from ..revision import RevisionEntry from datetime import datetime from pathlib import PosixPath -from typing import Dict, List +from typing import Any, Dict, List from swh.model.hashutil import hash_to_hex def normalize(path: PosixPath) -> PosixPath: spath = str(path) - if spath.startswith('./'): + if spath.startswith("./"): return PosixPath(spath[2:]) return path -def create_database( - conn: psycopg2.extensions.connection, - conninfo: dict, - name: str -): +def create_database(conn: psycopg2.extensions.connection, conninfo: dict, name: str): conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) # Normalize dbname to avoid issues when reconnecting below name = name.casefold() # Create new database dropping previous one if exists cursor = conn.cursor() - cursor.execute(f'''DROP DATABASE IF EXISTS {name}''') - cursor.execute(f'''CREATE DATABASE {name}''') + cursor.execute(f"""DROP DATABASE IF EXISTS {name}""") + cursor.execute(f"""CREATE DATABASE {name}""") conn.close() # Reconnect to server selecting newly created database to add tables - conninfo['dbname'] = name + conninfo["dbname"] = name conn = connect(conninfo) sqldir = os.path.dirname(os.path.realpath(__file__)) - execute_sql(conn, os.path.join(sqldir, 'provenance.sql')) + execute_sql(conn, PosixPath(os.path.join(sqldir, "provenance.sql"))) ################################################################################ ################################################################################ ################################################################################ + class ProvenancePostgreSQL(ProvenanceInterface): def __init__(self, conn: psycopg2.extensions.connection): # TODO: consider adding a mutex for thread safety conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) self.conn = conn self.cursor = self.conn.cursor() - self.insert_cache = None - self.select_cache = None + self.insert_cache: Dict[str, Any] = {} + self.select_cache: Dict[str, Any] = {} self.clear_caches() - def clear_caches(self): self.insert_cache = { "content": dict(), "content_early_in_rev": list(), "content_in_dir": list(), "directory": dict(), "directory_in_rev": list(), "revision": dict(), "revision_before_rev": list(), - "revision_in_org": list() - } - self.select_cache = { - "content": dict(), - "directory": dict(), - "revision": dict() + "revision_in_org": list(), } - + self.select_cache = {"content": dict(), "directory": dict(), "revision": dict()} def commit(self): result = False try: self.insert_all() # self.conn.commit() result = True # except psycopg2.DatabaseError: # # Database error occurred, rollback all changes # self.conn.rollback() # # TODO: maybe serialize and auto-merge transations. # # The only conflicts are on: # # - content: we keep the earliest date # # - directory: we keep the earliest date # # - content_in_dir: there should be just duplicated entries. except Exception as error: # Unexpected error occurred, rollback all changes and log message - logging.warning(f'Unexpected error: {error}') + logging.warning(f"Unexpected error: {error}") # self.conn.rollback() finally: self.clear_caches() return result - def content_add_to_directory( - self, - directory: DirectoryEntry, - blob: FileEntry, - prefix: PosixPath + self, directory: DirectoryEntry, blob: FileEntry, prefix: PosixPath ): - # logging.debug(f'NEW occurrence of content {hash_to_hex(blob.id)} in directory {hash_to_hex(directory.id)} (path: {prefix / blob.name})') - # self.cursor.execute('''INSERT INTO content_in_dir VALUES (%s,%s,%s)''', - # (blob.id, directory.id, bytes(normalize(prefix / blob.name)))) - self.insert_cache['content_in_dir'].append( + self.insert_cache["content_in_dir"].append( (blob.id, directory.id, bytes(normalize(prefix / blob.name))) ) - def content_add_to_revision( - self, - revision: RevisionEntry, - blob: FileEntry, - prefix: PosixPath + self, revision: RevisionEntry, blob: FileEntry, prefix: PosixPath ): - # logging.debug(f'EARLY occurrence of blob {hash_to_hex(blob.id)} in revision {hash_to_hex(revision.id)} (path: {prefix / blob.name})') - # self.cursor.execute('''INSERT INTO content_early_in_rev VALUES (%s,%s,%s)''', - # (blob.id, revision.id, bytes(normalize(prefix / blob.name)))) - self.insert_cache['content_early_in_rev'].append( + self.insert_cache["content_early_in_rev"].append( (blob.id, revision.id, bytes(normalize(prefix / blob.name))) ) - def content_find_first(self, blobid: str): - logging.info(f'Retrieving first occurrence of content {hash_to_hex(blobid)}') - self.cursor.execute('''SELECT blob, rev, date, path - FROM content_early_in_rev JOIN revision ON revision.id=content_early_in_rev.rev - WHERE content_early_in_rev.blob=%s ORDER BY date, rev, path ASC LIMIT 1''', (blobid,)) + logging.info(f"Retrieving first occurrence of content {hash_to_hex(blobid)}") + self.cursor.execute( + """SELECT blob, rev, date, path + FROM content_early_in_rev + JOIN revision ON revision.id=content_early_in_rev.rev + WHERE content_early_in_rev.blob=%s + ORDER BY date, rev, path ASC LIMIT 1""", + (blobid,), + ) return self.cursor.fetchone() - def content_find_all(self, blobid: str): - logging.info(f'Retrieving all occurrences of content {hash_to_hex(blobid)}') - self.cursor.execute('''(SELECT blob, rev, date, path - FROM content_early_in_rev JOIN revision ON revision.id=content_early_in_rev.rev - WHERE content_early_in_rev.blob=%s) - UNION - (SELECT content_in_rev.blob, content_in_rev.rev, revision.date, content_in_rev.path - FROM (SELECT content_in_dir.blob, directory_in_rev.rev, - CASE directory_in_rev.path - WHEN '.' THEN content_in_dir.path - ELSE (directory_in_rev.path || '/' || content_in_dir.path)::unix_path - END AS path - FROM content_in_dir - JOIN directory_in_rev ON content_in_dir.dir=directory_in_rev.dir - WHERE content_in_dir.blob=%s) AS content_in_rev - JOIN revision ON revision.id=content_in_rev.rev) - ORDER BY date, rev, path''', (blobid, blobid)) - # POSTGRESQL EXPLAIN + logging.info(f"Retrieving all occurrences of content {hash_to_hex(blobid)}") + self.cursor.execute( + """(SELECT blob, rev, date, path + FROM content_early_in_rev + JOIN revision ON revision.id=content_early_in_rev.rev + WHERE content_early_in_rev.blob=%s) + UNION + (SELECT content_in_rev.blob, content_in_rev.rev, revision.date, + content_in_rev.path + FROM (SELECT content_in_dir.blob, directory_in_rev.rev, + CASE directory_in_rev.path + WHEN '.' THEN content_in_dir.path + ELSE (directory_in_rev.path || '/' || + content_in_dir.path)::unix_path + END AS path + FROM content_in_dir + JOIN directory_in_rev + ON content_in_dir.dir=directory_in_rev.dir + WHERE content_in_dir.blob=%s + ) AS content_in_rev + JOIN revision ON revision.id=content_in_rev.rev + ) + ORDER BY date, rev, path""", + (blobid, blobid), + ) + # POSTGRESQL EXPLAIN yield from self.cursor.fetchall() - def content_get_early_date(self, blob: FileEntry) -> datetime: - logging.debug(f'Getting content {hash_to_hex(blob.id)} early date') + logging.debug(f"Getting content {hash_to_hex(blob.id)} early date") # First check if the date is being modified by current transection. - date = self.insert_cache['content'].get(blob.id, None) + date = self.insert_cache["content"].get(blob.id, None) if date is None: # If not, check whether it's been query before - date = self.select_cache['content'].get(blob.id, None) + date = self.select_cache["content"].get(blob.id, None) if date is None: # Otherwise, query the database and cache the value - self.cursor.execute('''SELECT date FROM content WHERE id=%s''', - (blob.id,)) + self.cursor.execute( + """SELECT date FROM content WHERE id=%s""", (blob.id,) + ) row = self.cursor.fetchone() date = row[0] if row is not None else None - self.select_cache['content'][blob.id] = date + self.select_cache["content"][blob.id] = date return date - def content_get_early_dates(self, blobs: List[FileEntry]) -> Dict[bytes, datetime]: dates = {} pending = [] for blob in blobs: # First check if the date is being modified by current transection. - date = self.insert_cache['content'].get(blob.id, None) + date = self.insert_cache["content"].get(blob.id, None) if date is not None: dates[blob.id] = date else: # If not, check whether it's been query before - date = self.select_cache['content'].get(blob.id, None) + date = self.select_cache["content"].get(blob.id, None) if date is not None: dates[blob.id] = date else: pending.append(blob.id) if pending: # Otherwise, query the database and cache the values - values = ', '.join(itertools.repeat('%s', len(pending))) - self.cursor.execute(f'''SELECT id, date FROM content WHERE id IN ({values})''', - tuple(pending)) + values = ", ".join(itertools.repeat("%s", len(pending))) + self.cursor.execute( + f"""SELECT id, date FROM content WHERE id IN ({values})""", + tuple(pending), + ) for row in self.cursor.fetchall(): dates[row[0]] = row[1] - self.select_cache['content'][row[0]] = row[1] + self.select_cache["content"][row[0]] = row[1] return dates - def content_set_early_date(self, blob: FileEntry, date: datetime): - # logging.debug(f'EARLY occurrence of blob {hash_to_hex(blob.id)} (timestamp: {date})') - # self.cursor.execute('''INSERT INTO content VALUES (%s,%s) - # ON CONFLICT (id) DO UPDATE SET date=%s''', - # (blob.id, date, date)) - self.insert_cache['content'][blob.id] = date - + self.insert_cache["content"][blob.id] = date def directory_add_to_revision( - self, - revision: RevisionEntry, - directory: DirectoryEntry, - path: PosixPath + self, revision: RevisionEntry, directory: DirectoryEntry, path: PosixPath ): - # logging.debug(f'NEW occurrence of directory {hash_to_hex(directory.id)} on the ISOCHRONE FRONTIER of revision {hash_to_hex(revision.id)} (path: {path})') - # self.cursor.execute('''INSERT INTO directory_in_rev VALUES (%s,%s,%s)''', - # (directory.id, revision.id, bytes(normalize(path)))) - self.insert_cache['directory_in_rev'].append( + self.insert_cache["directory_in_rev"].append( (directory.id, revision.id, bytes(normalize(path))) ) - - def directory_get_date_in_isochrone_frontier(self, directory: DirectoryEntry) -> datetime: - # logging.debug(f'Getting directory {hash_to_hex(directory.id)} early date') + def directory_get_date_in_isochrone_frontier( + self, directory: DirectoryEntry + ) -> datetime: # First check if the date is being modified by current transection. - date = self.insert_cache['directory'].get(directory.id, None) + date = self.insert_cache["directory"].get(directory.id, None) if date is None: # If not, check whether it's been query before - date = self.select_cache['directory'].get(directory.id, None) + date = self.select_cache["directory"].get(directory.id, None) if date is None: # Otherwise, query the database and cache the value - self.cursor.execute('''SELECT date FROM directory WHERE id=%s''', - (directory.id,)) + self.cursor.execute( + """SELECT date FROM directory WHERE id=%s""", (directory.id,) + ) row = self.cursor.fetchone() date = row[0] if row is not None else None - self.select_cache['directory'][directory.id] = date + self.select_cache["directory"][directory.id] = date return date - - def directory_get_early_dates(self, dirs: List[DirectoryEntry]) -> Dict[bytes, datetime]: + def directory_get_early_dates( + self, dirs: List[DirectoryEntry] + ) -> Dict[bytes, datetime]: dates = {} pending = [] for dir in dirs: # First check if the date is being modified by current transection. - date = self.insert_cache['directory'].get(dir.id, None) + date = self.insert_cache["directory"].get(dir.id, None) if date is not None: dates[dir.id] = date else: # If not, check whether it's been query before - date = self.select_cache['directory'].get(dir.id, None) + date = self.select_cache["directory"].get(dir.id, None) if date is not None: dates[dir.id] = date else: pending.append(dir.id) if pending: # Otherwise, query the database and cache the values - values = ', '.join(itertools.repeat('%s', len(pending))) - self.cursor.execute(f'''SELECT id, date FROM directory WHERE id IN ({values})''', - tuple(pending)) + values = ", ".join(itertools.repeat("%s", len(pending))) + self.cursor.execute( + f"""SELECT id, date FROM directory WHERE id IN ({values})""", + tuple(pending), + ) for row in self.cursor.fetchall(): dates[row[0]] = row[1] - self.select_cache['directory'][row[0]] = row[1] + self.select_cache["directory"][row[0]] = row[1] return dates - - def directory_set_date_in_isochrone_frontier(self, directory: DirectoryEntry, date: datetime): - # logging.debug(f'EARLY occurrence of directory {hash_to_hex(directory.id)} on the ISOCHRONE FRONTIER (timestamp: {date})') - # self.cursor.execute('''INSERT INTO directory VALUES (%s,%s) - # ON CONFLICT (id) DO UPDATE SET date=%s''', - # (directory.id, date, date)) - self.insert_cache['directory'][directory.id] = date - + def directory_set_date_in_isochrone_frontier( + self, directory: DirectoryEntry, date: datetime + ): + self.insert_cache["directory"][directory.id] = date def insert_all(self): # Performe insertions with cached information - if self.insert_cache['content']: + if self.insert_cache["content"]: psycopg2.extras.execute_values( self.cursor, - '''INSERT INTO content(id, date) VALUES %s + """INSERT INTO content(id, date) VALUES %s ON CONFLICT (id) DO - UPDATE SET date=LEAST(EXCLUDED.date,content.date)''', - self.insert_cache['content'].items() + UPDATE SET date=LEAST(EXCLUDED.date,content.date)""", + self.insert_cache["content"].items(), ) - if self.insert_cache['content_early_in_rev']: + if self.insert_cache["content_early_in_rev"]: psycopg2.extras.execute_values( self.cursor, - '''INSERT INTO content_early_in_rev VALUES %s - ON CONFLICT DO NOTHING''', - self.insert_cache['content_early_in_rev'] + """INSERT INTO content_early_in_rev VALUES %s + ON CONFLICT DO NOTHING""", + self.insert_cache["content_early_in_rev"], ) - if self.insert_cache['content_in_dir']: + if self.insert_cache["content_in_dir"]: psycopg2.extras.execute_values( self.cursor, - '''INSERT INTO content_in_dir VALUES %s - ON CONFLICT DO NOTHING''', - self.insert_cache['content_in_dir'] + """INSERT INTO content_in_dir VALUES %s + ON CONFLICT DO NOTHING""", + self.insert_cache["content_in_dir"], ) - if self.insert_cache['directory']: + if self.insert_cache["directory"]: psycopg2.extras.execute_values( self.cursor, - '''INSERT INTO directory(id, date) VALUES %s + """INSERT INTO directory(id, date) VALUES %s ON CONFLICT (id) DO - UPDATE SET date=LEAST(EXCLUDED.date,directory.date)''', - self.insert_cache['directory'].items() + UPDATE SET date=LEAST(EXCLUDED.date,directory.date)""", + self.insert_cache["directory"].items(), ) - if self.insert_cache['directory_in_rev']: + if self.insert_cache["directory_in_rev"]: psycopg2.extras.execute_values( self.cursor, - '''INSERT INTO directory_in_rev VALUES %s - ON CONFLICT DO NOTHING''', - self.insert_cache['directory_in_rev'] + """INSERT INTO directory_in_rev VALUES %s + ON CONFLICT DO NOTHING""", + self.insert_cache["directory_in_rev"], ) - if self.insert_cache['revision']: + if self.insert_cache["revision"]: psycopg2.extras.execute_values( self.cursor, - '''INSERT INTO revision(id, date) VALUES %s + """INSERT INTO revision(id, date) VALUES %s ON CONFLICT (id) DO - UPDATE SET date=LEAST(EXCLUDED.date,revision.date)''', - self.insert_cache['revision'].items() + UPDATE SET date=LEAST(EXCLUDED.date,revision.date)""", + self.insert_cache["revision"].items(), ) - if self.insert_cache['revision_before_rev']: + if self.insert_cache["revision_before_rev"]: psycopg2.extras.execute_values( self.cursor, - '''INSERT INTO revision_before_rev VALUES %s - ON CONFLICT DO NOTHING''', - self.insert_cache['revision_before_rev'] + """INSERT INTO revision_before_rev VALUES %s + ON CONFLICT DO NOTHING""", + self.insert_cache["revision_before_rev"], ) - if self.insert_cache['revision_in_org']: + if self.insert_cache["revision_in_org"]: psycopg2.extras.execute_values( self.cursor, - '''INSERT INTO revision_in_org VALUES %s - ON CONFLICT DO NOTHING''', - self.insert_cache['revision_in_org'] + """INSERT INTO revision_in_org VALUES %s + ON CONFLICT DO NOTHING""", + self.insert_cache["revision_in_org"], ) - def origin_get_id(self, origin: OriginEntry) -> int: if origin.id is None: # Check if current origin is already known and retrieve its internal id. - self.cursor.execute('''SELECT id FROM origin WHERE url=%s''', (origin.url,)) + self.cursor.execute("""SELECT id FROM origin WHERE url=%s""", (origin.url,)) row = self.cursor.fetchone() if row is None: # If the origin is seen for the first time, current revision is # the prefered one. - self.cursor.execute('''INSERT INTO origin (url) VALUES (%s) RETURNING id''', - (origin.url,)) + self.cursor.execute( + """INSERT INTO origin (url) VALUES (%s) RETURNING id""", + (origin.url,), + ) return self.cursor.fetchone()[0] else: return row[0] else: return origin.id - def revision_add(self, revision: RevisionEntry): # Add current revision to the compact DB - self.insert_cache['revision'][revision.id] = revision.date - - - def revision_add_before_revision(self, relative: RevisionEntry, revision: RevisionEntry): - # self.cursor.execute('''INSERT INTO revision_before_rev VALUES (%s,%s) - # ON CONFLICT DO NOTHING''', - # (revision.id, relative.id)) - self.insert_cache['revision_before_rev'].append((revision.id, relative.id)) + self.insert_cache["revision"][revision.id] = revision.date + def revision_add_before_revision( + self, relative: RevisionEntry, revision: RevisionEntry + ): + self.insert_cache["revision_before_rev"].append((revision.id, relative.id)) def revision_add_to_origin(self, origin: OriginEntry, revision: RevisionEntry): - # self.cursor.execute('''INSERT INTO revision_in_org VALUES (%s,%s) - # ON CONFLICT DO NOTHING''', - # (revision.id, origin.id)) - self.insert_cache['revision_in_org'].append((revision.id, origin.id)) - + self.insert_cache["revision_in_org"].append((revision.id, origin.id)) def revision_get_early_date(self, revision: RevisionEntry) -> datetime: - # logging.debug(f'Getting revision {hash_to_hex(revision.id)} early date') - # First check if the date is being modified by current transection. - date = self.insert_cache['revision'].get(revision.id, None) + date = self.insert_cache["revision"].get(revision.id, None) if date is None: # If not, check whether it's been query before - date = self.select_cache['revision'].get(revision.id, None) + date = self.select_cache["revision"].get(revision.id, None) if date is None: # Otherwise, query the database and cache the value - self.cursor.execute('''SELECT date FROM revision WHERE id=%s''', - (revision.id,)) + self.cursor.execute( + """SELECT date FROM revision WHERE id=%s""", (revision.id,) + ) row = self.cursor.fetchone() date = row[0] if row is not None else None - self.select_cache['revision'][revision.id] = date + self.select_cache["revision"][revision.id] = date return date - def revision_get_prefered_origin(self, revision: RevisionEntry) -> int: # TODO: adapt this method to consider cached values - self.cursor.execute('''SELECT COALESCE(org,0) FROM revision WHERE id=%s''', - (revision.id,)) + self.cursor.execute( + """SELECT COALESCE(org,0) FROM revision WHERE id=%s""", (revision.id,) + ) row = self.cursor.fetchone() # None means revision is not in database # 0 means revision has no prefered origin return row[0] if row is not None and row[0] != 0 else None - def revision_in_history(self, revision: RevisionEntry) -> bool: # TODO: adapt this method to consider cached values - self.cursor.execute('''SELECT 1 FROM revision_before_rev WHERE prev=%s''', - (revision.id,)) + self.cursor.execute( + """SELECT 1 FROM revision_before_rev WHERE prev=%s""", (revision.id,) + ) return self.cursor.fetchone() is not None - - def revision_set_prefered_origin(self, origin: OriginEntry, revision: RevisionEntry): + def revision_set_prefered_origin( + self, origin: OriginEntry, revision: RevisionEntry + ): # TODO: adapt this method to consider cached values - self.cursor.execute('''UPDATE revision SET org=%s WHERE id=%s''', - (origin.id, revision.id)) - + self.cursor.execute( + """UPDATE revision SET org=%s WHERE id=%s""", (origin.id, revision.id) + ) def revision_visited(self, revision: RevisionEntry) -> bool: # TODO: adapt this method to consider cached values - self.cursor.execute('''SELECT 1 FROM revision_in_org WHERE rev=%s''', - (revision.id,)) + self.cursor.execute( + """SELECT 1 FROM revision_in_org WHERE rev=%s""", (revision.id,) + ) return self.cursor.fetchone() is not None diff --git a/swh/provenance/provenance.py b/swh/provenance/provenance.py index 2252024..fb1ae56 100644 --- a/swh/provenance/provenance.py +++ b/swh/provenance/provenance.py @@ -1,310 +1,336 @@ -import logging - from .archive import ArchiveInterface from .model import DirectoryEntry, FileEntry from .origin import OriginEntry from .revision import RevisionEntry from datetime import datetime from pathlib import PosixPath -from typing import Dict, List - -from swh.model.hashutil import hash_to_hex +from typing import Dict, List, Optional, Tuple class ProvenanceInterface: def __init__(self, **kwargs): raise NotImplementedError - def commit(self): raise NotImplementedError - - def content_add_to_directory(self, directory: DirectoryEntry, blob: FileEntry, prefix: PosixPath): + def content_add_to_directory( + self, directory: DirectoryEntry, blob: FileEntry, prefix: PosixPath + ): raise NotImplementedError - - def content_add_to_revision(self, revision: RevisionEntry, blob: FileEntry, prefix: PosixPath): + def content_add_to_revision( + self, revision: RevisionEntry, blob: FileEntry, prefix: PosixPath + ): raise NotImplementedError - def content_find_first(self, blobid: str): raise NotImplementedError - def content_find_all(self, blobid: str): raise NotImplementedError - def content_get_early_date(self, blob: FileEntry) -> datetime: raise NotImplementedError - def content_get_early_dates(self, blobs: List[FileEntry]) -> Dict[bytes, datetime]: raise NotImplementedError - def content_set_early_date(self, blob: FileEntry, date: datetime): raise NotImplementedError - - def directory_add_to_revision(self, revision: RevisionEntry, directory: DirectoryEntry, path: PosixPath): + def directory_add_to_revision( + self, revision: RevisionEntry, directory: DirectoryEntry, path: PosixPath + ): raise NotImplementedError - - def directory_get_date_in_isochrone_frontier(self, directory: DirectoryEntry) -> datetime: + def directory_get_date_in_isochrone_frontier( + self, directory: DirectoryEntry + ) -> datetime: raise NotImplementedError - - def directory_get_early_dates(self, dirs: List[DirectoryEntry]) -> Dict[bytes, datetime]: + def directory_get_early_dates( + self, dirs: List[DirectoryEntry] + ) -> Dict[bytes, datetime]: raise NotImplementedError - - def directory_set_date_in_isochrone_frontier(self, directory: DirectoryEntry, date: datetime): + def directory_set_date_in_isochrone_frontier( + self, directory: DirectoryEntry, date: datetime + ): raise NotImplementedError - def origin_get_id(self, origin: OriginEntry) -> int: raise NotImplementedError - def revision_add(self, revision: RevisionEntry): raise NotImplementedError - - def revision_add_before_revision(self, relative: RevisionEntry, revision: RevisionEntry): + def revision_add_before_revision( + self, relative: RevisionEntry, revision: RevisionEntry + ): raise NotImplementedError - def revision_add_to_origin(self, origin: OriginEntry, revision: RevisionEntry): raise NotImplementedError - def revision_get_early_date(self, revision: RevisionEntry) -> datetime: raise NotImplementedError - def revision_get_prefered_origin(self, revision: RevisionEntry) -> int: raise NotImplementedError - def revision_in_history(self, revision: RevisionEntry) -> bool: raise NotImplementedError - - def revision_set_prefered_origin(self, origin: OriginEntry, revision: RevisionEntry): + def revision_set_prefered_origin( + self, origin: OriginEntry, revision: RevisionEntry + ): raise NotImplementedError - def revision_visited(self, revision: RevisionEntry) -> bool: raise NotImplementedError def directory_process_content( provenance: ProvenanceInterface, directory: DirectoryEntry, relative: DirectoryEntry, - prefix: PosixPath + prefix: PosixPath, ): stack = [(directory, prefix)] while stack: - dir, path = stack.pop() + current, path = stack.pop() - for child in iter(dir): + for child in iter(current): if isinstance(child, FileEntry): # Add content to the relative directory with the computed path. provenance.content_add_to_directory(relative, child, path) else: # Recursively walk the child directory. stack.append((child, path / child.name)) -def origin_add( +def directory_update_content( + stack: List[Tuple[DirectoryEntry, PosixPath]], provenance: ProvenanceInterface, - origin: OriginEntry + revision: RevisionEntry, + directory: DirectoryEntry, + path: PosixPath, + subdirs: Optional[List[DirectoryEntry]] = None, + blobs: Optional[List[FileEntry]] = None, + blobdates: Optional[Dict[bytes, datetime]] = None, ): + assert revision.date is not None + + # Init optional parameters if not provided. + if subdirs is None: + subdirs = [ + child for child in iter(directory) if isinstance(child, DirectoryEntry) + ] + + if blobs is None: + blobs = [child for child in iter(directory) if isinstance(child, FileEntry)] + + if blobdates is None: + blobdates = provenance.content_get_early_dates(blobs) + + # Iterate over blobs updating their date if necessary. + for blob in blobs: + date = blobdates.get(blob.id, None) + if date is None or revision.date < date: + provenance.content_set_early_date(blob, revision.date) + provenance.content_add_to_revision(revision, blob, path) + + # Push all subdirectories with its corresponding path to analyze them + # recursively. + for subdir in subdirs: + stack.append((subdir, path / subdir.name)) + + +def origin_add(provenance: ProvenanceInterface, origin: OriginEntry): + # TODO: refactor to iterate over origin visit statuses and commit only once + # per status. origin.id = provenance.origin_get_id(origin) for revision in origin.revisions: - # logging.info(f'Processing revision {hash_to_hex(revision.id)} from origin {origin.url}') origin_add_revision(provenance, origin, revision) # Commit after each revision - provenance.commit() # TODO: verify this! + provenance.commit() # TODO: verify this! def origin_add_revision( - provenance: ProvenanceInterface, - origin: OriginEntry, - revision: RevisionEntry + provenance: ProvenanceInterface, origin: OriginEntry, revision: RevisionEntry ): - stack = [(None, revision)] + stack: List[Tuple[Optional[RevisionEntry], RevisionEntry]] = [(None, revision)] while stack: - relative, rev = stack.pop() + relative, current = stack.pop() # Check if current revision has no prefered origin and update if necessary. - prefered = provenance.revision_get_prefered_origin(rev) - # logging.debug(f'Prefered origin for revision {hash_to_hex(rev.id)}: {prefered}') + prefered = provenance.revision_get_prefered_origin(current) if prefered is None: - provenance.revision_set_prefered_origin(origin, rev) + provenance.revision_set_prefered_origin(origin, current) ######################################################################## if relative is None: # This revision is pointed directly by the origin. - visited = provenance.revision_visited(rev) - logging.debug(f'Revision {hash_to_hex(rev.id)} in origin {origin.id}: {visited}') - - logging.debug(f'Adding revision {hash_to_hex(rev.id)} to origin {origin.id}') - provenance.revision_add_to_origin(origin, rev) + visited = provenance.revision_visited(current) + provenance.revision_add_to_origin(origin, current) if not visited: - stack.append((rev, rev)) + stack.append((current, current)) else: # This revision is a parent of another one in the history of the # relative revision. - for parent in iter(rev): + for parent in iter(current): visited = provenance.revision_visited(parent) - logging.debug(f'Parent {hash_to_hex(parent.id)} in some origin: {visited}') if not visited: # The parent revision has never been seen before pointing # directly to an origin. known = provenance.revision_in_history(parent) - logging.debug(f'Revision {hash_to_hex(parent.id)} before revision: {known}') if known: # The parent revision is already known in some other # revision's history. We should point it directly to # the origin and (eventually) walk its history. - logging.debug(f'Adding revision {hash_to_hex(parent.id)} directly to origin {origin.id}') stack.append((None, parent)) else: # The parent revision was never seen before. We should # walk its history and associate it with the same # relative revision. - logging.debug(f'Adding parent revision {hash_to_hex(parent.id)} to revision {hash_to_hex(relative.id)}') provenance.revision_add_before_revision(relative, parent) stack.append((relative, parent)) else: # The parent revision already points to an origin, so its # history was properly processed before. We just need to # make sure it points to the current origin as well. - logging.debug(f'Adding parent revision {hash_to_hex(parent.id)} to origin {origin.id}') provenance.revision_add_to_origin(origin, parent) def revision_add( - provenance: ProvenanceInterface, - archive: ArchiveInterface, - revision: RevisionEntry + provenance: ProvenanceInterface, archive: ArchiveInterface, revision: RevisionEntry ): + assert revision.date is not None + assert revision.root is not None + # Processed content starting from the revision's root directory date = provenance.revision_get_early_date(revision) if date is None or revision.date < date: provenance.revision_add(revision) revision_process_content( - provenance, - revision, - DirectoryEntry(archive, revision.root, PosixPath('.')) + provenance, revision, DirectoryEntry(archive, revision.root, PosixPath(".")) ) + return provenance.commit() def revision_process_content( - provenance: ProvenanceInterface, - revision: RevisionEntry, - directory: DirectoryEntry + provenance: ProvenanceInterface, revision: RevisionEntry, directory: DirectoryEntry ): - date = provenance.directory_get_date_in_isochrone_frontier(directory) - stack = [(directory, date, directory.name)] - # stack = [(directory, directory.name)] + assert revision.date is not None + + stack = [(directory, directory.name)] while stack: - dir, date, path = stack.pop() - # dir, path = stack.pop() - # date = provenance.directory_get_date_in_isochrone_frontier(dir) + # Get next directory to process and query its date right before + # processing to be sure we get the most recently updated value. + current, path = stack.pop() + date = provenance.directory_get_date_in_isochrone_frontier(current) if date is None: # The directory has never been seen on the isochrone graph of a # revision. Its children should be checked. - blobs = [child for child in iter(dir) if isinstance(child, FileEntry)] - dirs = [child for child in iter(dir) if isinstance(child, DirectoryEntry)] + blobs = [child for child in iter(current) if isinstance(child, FileEntry)] + dirs = [ + child for child in iter(current) if isinstance(child, DirectoryEntry) + ] blobdates = provenance.content_get_early_dates(blobs) - # TODO: this will only return timestamps for diretories that were - # seen in an isochrone frontier. But a directory may only cointain a - # subdirectory whose contents are already known. Which one should be - # added to the frontier then (the root or the sub directory)? dirdates = provenance.directory_get_early_dates(dirs) + # Get the list of ids with no duplicates to ensure we have + # available dates for all the elements. This prevents takign a + # wrong decision when a blob occurres more than once in the same + # directory. ids = list(dict.fromkeys([child.id for child in blobs + dirs])) if ids: dates = list(blobdates.values()) + list(dirdates.values()) - if len(dates) == len(ids) and max(dates) <= revision.date: - # The directory belongs to the isochrone frontier of the - # current revision, and this is the first time it appears - # as such. - provenance.directory_set_date_in_isochrone_frontier(dir, max(dates)) - provenance.directory_add_to_revision(revision, dir, path) - directory_process_content( - provenance, - directory=dir, - relative=dir, - prefix=PosixPath('.') - ) + if len(dates) == len(ids): + # All child nodes of current directory are already known. + maxdate = max(dates) < revision.date + + if maxdate < revision.date: + # The directory belongs to the outer isochrone frontier + # of the current revision, and this is the first time + # it appears as such. + # TODO: maybe the parent directory is the one that has + # to be added to the frontier instead! + provenance.directory_set_date_in_isochrone_frontier( + current, maxdate + ) + provenance.directory_add_to_revision(revision, current, path) + directory_process_content( + provenance, + directory=current, + relative=current, + prefix=PosixPath("."), + ) + + else: + # This revision is either in the inner frontiern or out + # of order. All the children from the current directory + # should be analyzed (and timestamps eventually + # updated) yet current. + directory_update_content( + stack, + provenance, + revision, + current, + path, + subdirs=dirs, + blobs=blobs, + blobdates=blobdates, + ) else: - # The directory is not on the isochrone frontier of the - # current revision. Its child nodes should be analyzed. - ############################################################ - for child in blobs: - date = blobdates.get(child.id, None) - # date = provenance.content_get_early_date(child) - if date is None or revision.date < date: - provenance.content_set_early_date(child, revision.date) - provenance.content_add_to_revision(revision, child, path) - - for child in dirs: - date = dirdates.get(child.id, None) - # date = provenance.directory_get_date_in_isochrone_frontier(child) - stack.append((child, date, path / child.name)) - # stack.append((child, path / child.name)) - ############################################################ + # Al least one child node is known, ie. the directory is + # not on the outer isochrone frontier of the current + # revision. Its child nodes should be analyzed and current + # directory updated before them. + # FIXME: I believe the only different between this branche + # and the two 'else' cases above is this push to the stack. + # If so, we might refactor this to avoid so many branches. + # stack.append((current, path)) + # TODO: to uncomment the above line is it necessary to + # postpone the adding of dictories to the isochrone + # frontier from the branch above (maxdate < revision.date). + directory_update_content( + stack, + provenance, + revision, + current, + path, + subdirs=dirs, + blobs=blobs, + blobdates=blobdates, + ) elif revision.date < date: # The directory has already been seen on the isochrone frontier of # a revision, but current revision is earlier. Its children should # be updated. - blobs = [child for child in iter(dir) if isinstance(child, FileEntry)] - dirs = [child for child in iter(dir) if isinstance(child, DirectoryEntry)] - - blobdates = provenance.content_get_early_dates(blobs) - dirdates = provenance.directory_get_early_dates(dirs) - - #################################################################### - for child in blobs: - # date = blobdates.get(child.id, None) - date = provenance.content_get_early_date(child) - if date is None or revision.date < date: - provenance.content_set_early_date(child, revision.date) - provenance.content_add_to_revision(revision, child, path) - - for child in dirs: - # date = dirdates.get(child.id, None) - date = provenance.directory_get_date_in_isochrone_frontier(child) - stack.append((child, date, path / child.name)) - # stack.append((child, path / child.name)) - #################################################################### - - provenance.directory_set_date_in_isochrone_frontier(dir, revision.date) + directory_update_content(stack, provenance, revision, current, path) + provenance.directory_set_date_in_isochrone_frontier(current, revision.date) else: - # The directory has already been seen on the isochrone frontier of - # an earlier revision. Just add it to the current revision. - provenance.directory_add_to_revision(revision, dir, path) + # The directory has already been seen on the outer isochrone + # frontier of an earlier revision. Just add it to the current + # revision. + provenance.directory_add_to_revision(revision, current, path) diff --git a/swh/provenance/revision.py b/swh/provenance/revision.py index d32a25d..8fdfb5b 100644 --- a/swh/provenance/revision.py +++ b/swh/provenance/revision.py @@ -1,167 +1,184 @@ -import logging import threading from .archive import ArchiveInterface from datetime import datetime +from typing import Optional -from swh.model.hashutil import hash_to_bytes, hash_to_hex +from swh.model.hashutil import hash_to_bytes class RevisionEntry: def __init__( self, archive: ArchiveInterface, id: bytes, - date: datetime=None, - root: bytes=None, - parents: list=None + date: Optional[datetime] = None, + root: Optional[bytes] = None, + parents: Optional[list] = None, ): self.archive = archive self.id = id self.date = date self.parents = parents self.root = root def __iter__(self): if self.parents is None: self.parents = [] for parent in self.archive.revision_get([self.id]): if parent is not None: self.parents.append( RevisionEntry( self.archive, parent.id, - parents=[RevisionEntry(self.archive, id) for id in parent.parents] + parents=[ + RevisionEntry(self.archive, id) for id in parent.parents + ], ) ) return iter(self.parents) -################################################################################ -################################################################################ +######################################################################################## +######################################################################################## + class RevisionIterator: """Iterator interface.""" def __iter__(self): pass def __next__(self): pass class FileRevisionIterator(RevisionIterator): """Iterator over revisions present in the given CSV file.""" - def __init__(self, filename: str, archive: ArchiveInterface, limit: int=None): + def __init__( + self, filename: str, archive: ArchiveInterface, limit: Optional[int] = None + ): self.file = open(filename) self.idx = 0 self.limit = limit self.mutex = threading.Lock() self.archive = archive def next(self): self.mutex.acquire() line = self.file.readline().strip() if line and (self.limit is None or self.idx < self.limit): self.idx = self.idx + 1 - id, date, root = line.strip().split(',') + id, date, root = line.strip().split(",") self.mutex.release() return RevisionEntry( self.archive, hash_to_bytes(id), date=datetime.fromisoformat(date), - root=hash_to_bytes(root) + root=hash_to_bytes(root), ) else: self.mutex.release() return None # class ArchiveRevisionIterator(RevisionIterator): # """Iterator over revisions present in the given database.""" # # def __init__(self, conn, limit=None, chunksize=100): # self.cur = conn.cursor() # self.chunksize = chunksize # self.records = [] # if limit is None: # self.cur.execute('''SELECT id, date, committer_date, directory # FROM revision''') # else: # self.cur.execute('''SELECT id, date, committer_date, directory # FROM revision # LIMIT %s''', (limit,)) # for row in self.cur.fetchmany(self.chunksize): # record = self.make_record(row) # if record is not None: # self.records.append(record) # self.mutex = threading.Lock() # # def __del__(self): # self.cur.close() # # def next(self): # self.mutex.acquire() # if not self.records: # self.records.clear() # for row in self.cur.fetchmany(self.chunksize): # record = self.make_record(row) # if record is not None: # self.records.append(record) # # if self.records: # revision, *self.records = self.records # self.mutex.release() # return revision # else: # self.mutex.release() # return None # # def make_record(self, row): # # Only revision with author or commiter date are considered # if row[1] is not None: # # If the revision has author date, it takes precedence # return RevisionEntry(row[0], row[1], row[3]) # elif row[2] is not None: # # If not, we use the commiter date # return RevisionEntry(row[0], row[2], row[3]) -################################################################################ -################################################################################ +######################################################################################## +######################################################################################## # class RevisionWorker(threading.Thread): # def __init__( # self, # id: int, # conninfo: dict, # archive: ArchiveInterface, # revisions: RevisionIterator # ): # from .provenance import get_provenance # # super().__init__() # self.archive = archive # self.id = id # self.provenance = get_provenance(conninfo) # self.revisions = revisions # # # def run(self): # from .provenance import revision_add # # # while True: # revision = self.revisions.next() # if revision is None: break # # processed = False # while not processed: -# logging.info(f'Thread {self.id} - Processing revision {hash_to_hex(revision.id)} (timestamp: {revision.date})') +# logging.info( +# f'Thread {( +# self.id +# )} - Processing revision {( +# hash_to_hex(revision.id) +# )} (timestamp: {revision.date})' +# ) # processed = revision_add(self.provenance, self.archive, revision) # if not processed: -# logging.warning(f'Thread {self.id} - Failed to process revision {hash_to_hex(revision.id)} (timestamp: {revision.date})') +# logging.warning( +# f'Thread {( +# self.id +# )} - Failed to process revision {( +# hash_to_hex(revision.id) +# )} (timestamp: {revision.date})' +# ) diff --git a/swh/provenance/storage/archive.py b/swh/provenance/storage/archive.py index c01464f..b1d9186 100644 --- a/swh/provenance/storage/archive.py +++ b/swh/provenance/storage/archive.py @@ -1,45 +1,47 @@ -import psycopg2 - from ..archive import ArchiveInterface # from functools import lru_cache from methodtools import lru_cache from typing import List from swh.storage import get_storage class ArchiveStorage(ArchiveInterface): def __init__(self, cls: str, **kwargs): self.storage = get_storage(cls, **kwargs) @lru_cache(maxsize=1000000) def directory_ls(self, id: bytes): # TODO: filter unused fields return [entry for entry in self.storage.directory_ls(id)] def iter_origins(self): from swh.storage.algos.origin import iter_origins + yield from iter_origins(self.storage) def iter_origin_visits(self, origin: str): from swh.storage.algos.origin import iter_origin_visits + # TODO: filter unused fields yield from iter_origin_visits(self.storage, origin) def iter_origin_visit_statuses(self, origin: str, visit: int): from swh.storage.algos.origin import iter_origin_visit_statuses + # TODO: filter unused fields yield from iter_origin_visit_statuses(self.storage, origin, visit) def release_get(self, ids: List[bytes]): # TODO: filter unused fields yield from self.storage.release_get(ids) def revision_get(self, ids: List[bytes]): # TODO: filter unused fields yield from self.storage.revision_get(ids) def snapshot_get_all_branches(self, snapshot: bytes): from swh.storage.algos.snapshot import snapshot_get_all_branches + # TODO: filter unused fields return snapshot_get_all_branches(self.storage, snapshot)