diff --git a/mypy.ini b/mypy.ini index 04cfde2..fe5b22f 100644 --- a/mypy.ini +++ b/mypy.ini @@ -1,21 +1,24 @@ [mypy] namespace_packages = True warn_unused_ignores = True # 3rd party libraries without stubs (yet) [mypy-pkg_resources.*] ignore_missing_imports = True [mypy-pytest.*] ignore_missing_imports = True [mypy-tqdm.*] ignore_missing_imports = True [mypy-confluent_kafka.*] ignore_missing_imports = True +[mypy-pyorc.*] +ignore_missing_imports = True + # [mypy-add_your_lib_here.*] # ignore_missing_imports = True diff --git a/requirements.txt b/requirements.txt index 6ddf609..329053e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,6 @@ # Add here external Python modules dependencies, one per line. Module names # should match https://pypi.python.org/pypi names. For the full spec or # dependency lines, see https://pip.readthedocs.org/en/1.1/requirements.html click tqdm +pyorc diff --git a/swh/dataset/cli.py b/swh/dataset/cli.py index 6c129e6..055ad8a 100644 --- a/swh/dataset/cli.py +++ b/swh/dataset/cli.py @@ -1,124 +1,126 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information # WARNING: do not import unnecessary things here to keep cli startup time under # control import os import pathlib import click from swh.core.cli import CONTEXT_SETTINGS from swh.core.cli import swh as swh_cli_group from swh.dataset.exporters.edges import GraphEdgesExporter +from swh.dataset.exporters.orc import ORCExporter from swh.dataset.journalprocessor import ParallelJournalProcessor @swh_cli_group.group(name="dataset", context_settings=CONTEXT_SETTINGS) @click.option( "--config-file", "-C", default=None, type=click.Path(exists=True, dir_okay=False), help="Configuration file.", ) @click.pass_context def dataset_cli_group(ctx, config_file): """Software Heritage Dataset Tools""" from swh.core import config ctx.ensure_object(dict) conf = config.read(config_file) ctx.obj["config"] = conf @dataset_cli_group.group("graph") @click.pass_context def graph(ctx): """Manage graph export""" pass AVAILABLE_EXPORTERS = { "edges": GraphEdgesExporter, + "orc": ORCExporter, } @graph.command("export") @click.argument("export-path", type=click.Path()) @click.option("--export-id", "-e", help="Unique ID of the export run.") @click.option( "--formats", "-f", type=click.STRING, default=",".join(AVAILABLE_EXPORTERS.keys()), show_default=True, help="Formats to export.", ) @click.option("--processes", "-p", default=1, help="Number of parallel processes") @click.option( "--exclude", type=click.STRING, help="Comma-separated list of object types to exclude", ) @click.pass_context def export_graph(ctx, export_path, export_id, formats, exclude, processes): """Export the Software Heritage graph as an edge dataset.""" import uuid config = ctx.obj["config"] if not export_id: export_id = str(uuid.uuid4()) exclude_obj_types = {o.strip() for o in exclude.split(",")} export_formats = [c.strip() for c in formats.split(",")] for f in export_formats: if f not in AVAILABLE_EXPORTERS: raise click.BadOptionUsage( option_name="formats", message=f"{f} is not an available format." ) # Run the exporter for each edge type. object_types = [ "origin", "origin_visit", "origin_visit_status", "snapshot", "release", "revision", "directory", "content", "skipped_content", ] for obj_type in object_types: if obj_type in exclude_obj_types: continue exporters = [ ( AVAILABLE_EXPORTERS[f], {"export_path": os.path.join(export_path, f, obj_type)}, ) for f in export_formats ] parallel_exporter = ParallelJournalProcessor( config, exporters, export_id, obj_type, node_sets_path=pathlib.Path(export_path) / ".node_sets" / obj_type, processes=processes, ) print("Exporting {}:".format(obj_type)) parallel_exporter.run() @graph.command("sort") @click.argument("export-path", type=click.Path()) @click.pass_context def sort_graph(ctx, export_path): config = ctx.obj["config"] from swh.dataset.exporters.edges import sort_graph_nodes sort_graph_nodes(export_path, config) diff --git a/swh/dataset/exporters/orc.py b/swh/dataset/exporters/orc.py new file mode 100644 index 0000000..609d161 --- /dev/null +++ b/swh/dataset/exporters/orc.py @@ -0,0 +1,260 @@ +# Copyright (C) 2020 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import contextlib +from datetime import datetime +import pathlib +import uuid + +from pyorc import BigInt, Binary, Int, SmallInt, String, Struct, Timestamp, Writer + +from swh.dataset.exporter import ExporterDispatch +from swh.dataset.utils import remove_pull_requests +from swh.model.hashutil import hash_to_hex + +# fmt: off +EXPORT_SCHEMA = { + 'origin': Struct( + url=String() + ), + 'origin_visit': Struct( + origin=String(), + visit=BigInt(), + date=Timestamp(), + type=String(), + ), + 'origin_visit_status': Struct( + origin=String(), + visit=BigInt(), + date=Timestamp(), + status=String(), + snapshot=String(), + ), + 'snapshot': Struct( + id=String(), + ), + 'snapshot_branch': Struct( + snapshot_id=String(), + name=Binary(), + target=String(), + target_type=String(), + ), + 'release': Struct( + id=String(), + name=Binary(), + message=Binary(), + target=String(), + target_type=String(), + author=Binary(), + date=Timestamp(), + date_offset=SmallInt(), + ), + 'revision': Struct( + id=String(), + message=Binary(), + author=Binary(), + date=Timestamp(), + date_offset=SmallInt(), + committer=Binary(), + committer_date=Timestamp(), + committer_offset=SmallInt(), + directory=String(), + ), + 'directory': Struct( + id=String(), + ), + 'directory_entry': Struct( + directory_id=String(), + name=Binary(), + type=String(), + target=String(), + perms=Int(), + ), + 'content': Struct( + sha1=String(), + sha1_git=String(), + sha256=String(), + blake2s256=String(), + length=BigInt(), + status=String(), + ), + 'skipped_content': Struct( + sha1=String(), + sha1_git=String(), + sha256=String(), + blake2s256=String(), + length=BigInt(), + status=String(), + reason=String(), + ), +} +# fmt: on + + +def hash_to_hex_or_none(hash): + return hash_to_hex(hash) if hash is not None else None + + +def swh_date_to_datetime(obj): + if obj is None or obj["timestamp"] is None: + return None + return datetime.fromtimestamp( + obj["timestamp"]["seconds"] + (obj["timestamp"]["microseconds"] / 1e6) + ) + + +def swh_date_to_offset(obj): + if obj is None: + return None + return obj["offset"] + + +class ORCExporter(ExporterDispatch): + """ + Implementation of an exporter which writes the entire graph dataset as + ORC files. Useful for large scale processing, notably on cloud instances + (e.g BigQuery, Amazon Athena, Azure). + """ + + def __init__(self, config, export_path, **kwargs): + super().__init__(config) + self.export_path = pathlib.Path(export_path) + self.export_path.mkdir(exist_ok=True, parents=True) + self.writers = {} + self.exit_stack = contextlib.ExitStack() + + def __enter__(self): + self.exit_stack.__enter__() + return self + + def __exit__(self, exc_type, exc_value, traceback): + self.exit_stack.__exit__(exc_type, exc_value, traceback) + + def get_writer_for(self, table_name: str): + if table_name not in self.writers: + object_type_dir = self.export_path / table_name + object_type_dir.mkdir(exist_ok=True) + unique_id = str(uuid.uuid4()) + export_file = object_type_dir / ("graph-{}.orc".format(unique_id)) + export_obj = self.exit_stack.enter_context(export_file.open("wb")) + self.writers[table_name] = self.exit_stack.enter_context( + Writer(export_obj, EXPORT_SCHEMA[table_name]) + ) + return self.writers[table_name] + + def process_origin(self, origin): + origin_writer = self.get_writer_for("origin") + origin_writer.write((origin["url"],)) + + def process_origin_visit(self, visit): + origin_visit_writer = self.get_writer_for("origin_visit") + origin_visit_writer.write( + (visit["origin"], visit["visit"], visit["date"], visit["type"],) + ) + + def process_origin_visit_status(self, visit_status): + origin_visit_status_writer = self.get_writer_for("origin_visit_status") + origin_visit_status_writer.write( + ( + visit_status["origin"], + visit_status["visit"], + visit_status["date"], + visit_status["status"], + hash_to_hex_or_none(visit_status["snapshot"]), + ) + ) + + def process_snapshot(self, snapshot): + if self.config.get("remove_pull_requests"): + remove_pull_requests(snapshot) + snapshot_writer = self.get_writer_for("snapshot") + snapshot_writer.write((hash_to_hex_or_none(snapshot["id"]),)) + + snapshot_branch_writer = self.get_writer_for("snapshot_branch") + for branch_name, branch in snapshot["branches"].items(): + if branch is None: + continue + snapshot_branch_writer.write( + ( + hash_to_hex_or_none(snapshot["id"]), + branch_name, + hash_to_hex_or_none(branch["target"]), + branch["target_type"], + ) + ) + + def process_release(self, release): + release_writer = self.get_writer_for("release") + release_writer.write( + ( + hash_to_hex_or_none(release["id"]), + release["name"], + release["message"], + hash_to_hex_or_none(release["target"]), + release["target_type"], + release["author"]["fullname"], + swh_date_to_datetime(release["date"]), + swh_date_to_offset(release["date"]), + ) + ) + + def process_revision(self, revision): + release_writer = self.get_writer_for("revision") + release_writer.write( + ( + hash_to_hex_or_none(revision["id"]), + revision["message"], + revision["author"]["fullname"], + swh_date_to_datetime(revision["date"]), + swh_date_to_offset(revision["date"]), + revision["committer"]["fullname"], + swh_date_to_datetime(revision["committer_date"]), + swh_date_to_offset(revision["committer_date"]), + hash_to_hex_or_none(revision["directory"]), + ) + ) + + def process_directory(self, directory): + directory_writer = self.get_writer_for("directory") + directory_writer.write((hash_to_hex_or_none(directory["id"]),)) + + directory_entry_writer = self.get_writer_for("directory_entry") + for entry in directory["entries"]: + directory_entry_writer.write( + ( + hash_to_hex_or_none(directory["id"]), + entry["name"], + entry["type"], + hash_to_hex_or_none(entry["target"]), + entry["perms"], + ) + ) + + def process_content(self, content): + content_writer = self.get_writer_for("content") + content_writer.write( + ( + hash_to_hex_or_none(content["sha1"]), + hash_to_hex_or_none(content["sha1_git"]), + hash_to_hex_or_none(content["sha256"]), + hash_to_hex_or_none(content["blake2s256"]), + content["length"], + content["status"], + ) + ) + + def process_skipped_content(self, skipped_content): + skipped_content_writer = self.get_writer_for("skipped_content") + skipped_content_writer.write( + ( + hash_to_hex_or_none(skipped_content["sha1"]), + hash_to_hex_or_none(skipped_content["sha1_git"]), + hash_to_hex_or_none(skipped_content["sha256"]), + hash_to_hex_or_none(skipped_content["blake2s256"]), + skipped_content["length"], + skipped_content["status"], + skipped_content["reason"], + ) + ) diff --git a/swh/dataset/utils.py b/swh/dataset/utils.py index 39bf829..977c5b4 100644 --- a/swh/dataset/utils.py +++ b/swh/dataset/utils.py @@ -1,105 +1,105 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import sqlite3 import subprocess class ZSTFile: """ Object-like wrapper around a ZST file. Uses a subprocess of the "zstd" command to compress and deflate the objects. """ def __init__(self, path: str, mode: str = "r"): if mode not in ("r", "rb", "w", "wb"): raise ValueError(f"ZSTFile mode {mode} is invalid.") self.path = path self.mode = mode def __enter__(self) -> "ZSTFile": is_text = not (self.mode in ("rb", "wb")) writing = self.mode in ("w", "wb") if writing: cmd = ["zstd", "-q", "-o", self.path] else: cmd = ["zstdcat", self.path] self.process = subprocess.Popen( cmd, text=is_text, stdin=subprocess.PIPE, stdout=subprocess.PIPE, ) return self def __exit__(self, exc_type, exc_value, tb): self.process.stdin.close() self.process.stdout.close() self.process.wait() def read(self, *args): return self.process.stdout.read(*args) def write(self, buf): self.process.stdin.write(buf) class SQLiteSet: """ On-disk Set object for hashes using SQLite as an indexer backend. Used to deduplicate objects when processing large queues with duplicates. """ def __init__(self, db_path): self.db_path = db_path def __enter__(self): self.db = sqlite3.connect(str(self.db_path)) self.db.execute( "CREATE TABLE IF NOT EXISTS" " tmpset (val TEXT NOT NULL PRIMARY KEY)" " WITHOUT ROWID" ) self.db.execute("PRAGMA synchronous = OFF") self.db.execute("PRAGMA journal_mode = OFF") return self def __exit__(self, exc_type, exc_val, exc_tb): self.db.commit() self.db.close() def add(self, v: bytes) -> bool: """ Add an item to the set. Args: v: The value to add to the set. Returns: True if the value was added to the set, False if it was already present. """ try: self.db.execute("INSERT INTO tmpset(val) VALUES (?)", (v.hex(),)) except sqlite3.IntegrityError: return False else: return True def remove_pull_requests(snapshot): """ Heuristic to filter out pull requests in snapshots: remove all branches that start with refs/ but do not start with refs/heads or refs/tags. """ # Copy the items with list() to remove items during iteration for branch_name, branch in list(snapshot["branches"].items()): original_branch_name = branch_name while branch and branch.get("target_type") == "alias": branch_name = branch["target"] - branch = snapshot["branches"][branch_name] + branch = snapshot["branches"].get(branch_name) if branch is None or not branch_name: continue if branch_name.startswith(b"refs/") and not ( branch_name.startswith(b"refs/heads") or branch_name.startswith(b"refs/tags") ): snapshot["branches"].pop(original_branch_name)