diff --git a/mypy.ini b/mypy.ini --- a/mypy.ini +++ b/mypy.ini @@ -17,5 +17,8 @@ [mypy-confluent_kafka.*] ignore_missing_imports = True +[mypy-pyorc.*] +ignore_missing_imports = True + # [mypy-add_your_lib_here.*] # ignore_missing_imports = True diff --git a/requirements.txt b/requirements.txt --- a/requirements.txt +++ b/requirements.txt @@ -3,3 +3,4 @@ # dependency lines, see https://pip.readthedocs.org/en/1.1/requirements.html click tqdm +pyorc diff --git a/swh/dataset/cli.py b/swh/dataset/cli.py --- a/swh/dataset/cli.py +++ b/swh/dataset/cli.py @@ -13,6 +13,7 @@ from swh.core.cli import CONTEXT_SETTINGS from swh.core.cli import swh as swh_cli_group from swh.dataset.exporters.edges import GraphEdgesExporter +from swh.dataset.exporters.orc import ORCExporter from swh.dataset.journalprocessor import ParallelJournalProcessor @@ -43,6 +44,7 @@ AVAILABLE_EXPORTERS = { "edges": GraphEdgesExporter, + "orc": ORCExporter, } diff --git a/swh/dataset/exporters/orc.py b/swh/dataset/exporters/orc.py new file mode 100644 --- /dev/null +++ b/swh/dataset/exporters/orc.py @@ -0,0 +1,260 @@ +# Copyright (C) 2020 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import contextlib +from datetime import datetime +import pathlib +import uuid + +from pyorc import BigInt, Binary, Int, SmallInt, String, Struct, Timestamp, Writer + +from swh.dataset.exporter import ExporterDispatch +from swh.dataset.utils import remove_pull_requests +from swh.model.hashutil import hash_to_hex + +# fmt: off +EXPORT_SCHEMA = { + 'origin': Struct( + url=String() + ), + 'origin_visit': Struct( + origin=String(), + visit=BigInt(), + date=Timestamp(), + type=String(), + ), + 'origin_visit_status': Struct( + origin=String(), + visit=BigInt(), + date=Timestamp(), + status=String(), + snapshot=String(), + ), + 'snapshot': Struct( + id=String(), + ), + 'snapshot_branch': Struct( + snapshot_id=String(), + name=Binary(), + target=String(), + target_type=String(), + ), + 'release': Struct( + id=String(), + name=Binary(), + message=Binary(), + target=String(), + target_type=String(), + author=Binary(), + date=Timestamp(), + date_offset=SmallInt(), + ), + 'revision': Struct( + id=String(), + message=Binary(), + author=Binary(), + date=Timestamp(), + date_offset=SmallInt(), + committer=Binary(), + committer_date=Timestamp(), + committer_offset=SmallInt(), + directory=String(), + ), + 'directory': Struct( + id=String(), + ), + 'directory_entry': Struct( + directory_id=String(), + name=Binary(), + type=String(), + target=String(), + perms=Int(), + ), + 'content': Struct( + sha1=String(), + sha1_git=String(), + sha256=String(), + blake2s256=String(), + length=BigInt(), + status=String(), + ), + 'skipped_content': Struct( + sha1=String(), + sha1_git=String(), + sha256=String(), + blake2s256=String(), + length=BigInt(), + status=String(), + reason=String(), + ), +} +# fmt: on + + +def hash_to_hex_or_none(hash): + return hash_to_hex(hash) if hash is not None else None + + +def swh_date_to_datetime(obj): + if obj is None or obj["timestamp"] is None: + return None + return datetime.fromtimestamp( + obj["timestamp"]["seconds"] + (obj["timestamp"]["microseconds"] / 1e6) + ) + + +def swh_date_to_offset(obj): + if obj is None: + return None + return obj["offset"] + + +class ORCExporter(ExporterDispatch): + """ + Implementation of an exporter which writes the entire graph dataset as + ORC files. Useful for large scale processing, notably on cloud instances + (e.g BigQuery, Amazon Athena, Azure). + """ + + def __init__(self, config, export_path, **kwargs): + super().__init__(config) + self.export_path = pathlib.Path(export_path) + self.export_path.mkdir(exist_ok=True, parents=True) + self.writers = {} + self.exit_stack = contextlib.ExitStack() + + def __enter__(self): + self.exit_stack.__enter__() + return self + + def __exit__(self, exc_type, exc_value, traceback): + self.exit_stack.__exit__(exc_type, exc_value, traceback) + + def get_writer_for(self, table_name: str): + if table_name not in self.writers: + object_type_dir = self.export_path / table_name + object_type_dir.mkdir(exist_ok=True) + unique_id = str(uuid.uuid4()) + export_file = object_type_dir / ("graph-{}.orc".format(unique_id)) + export_obj = self.exit_stack.enter_context(export_file.open("wb")) + self.writers[table_name] = self.exit_stack.enter_context( + Writer(export_obj, EXPORT_SCHEMA[table_name]) + ) + return self.writers[table_name] + + def process_origin(self, origin): + origin_writer = self.get_writer_for("origin") + origin_writer.write((origin["url"],)) + + def process_origin_visit(self, visit): + origin_visit_writer = self.get_writer_for("origin_visit") + origin_visit_writer.write( + (visit["origin"], visit["visit"], visit["date"], visit["type"],) + ) + + def process_origin_visit_status(self, visit_status): + origin_visit_status_writer = self.get_writer_for("origin_visit_status") + origin_visit_status_writer.write( + ( + visit_status["origin"], + visit_status["visit"], + visit_status["date"], + visit_status["status"], + hash_to_hex_or_none(visit_status["snapshot"]), + ) + ) + + def process_snapshot(self, snapshot): + if self.config.get("remove_pull_requests"): + remove_pull_requests(snapshot) + snapshot_writer = self.get_writer_for("snapshot") + snapshot_writer.write((hash_to_hex_or_none(snapshot["id"]),)) + + snapshot_branch_writer = self.get_writer_for("snapshot_branch") + for branch_name, branch in snapshot["branches"].items(): + if branch is None: + continue + snapshot_branch_writer.write( + ( + hash_to_hex_or_none(snapshot["id"]), + branch_name, + hash_to_hex_or_none(branch["target"]), + branch["target_type"], + ) + ) + + def process_release(self, release): + release_writer = self.get_writer_for("release") + release_writer.write( + ( + hash_to_hex_or_none(release["id"]), + release["name"], + release["message"], + hash_to_hex_or_none(release["target"]), + release["target_type"], + release["author"]["fullname"], + swh_date_to_datetime(release["date"]), + swh_date_to_offset(release["date"]), + ) + ) + + def process_revision(self, revision): + release_writer = self.get_writer_for("revision") + release_writer.write( + ( + hash_to_hex_or_none(revision["id"]), + revision["message"], + revision["author"]["fullname"], + swh_date_to_datetime(revision["date"]), + swh_date_to_offset(revision["date"]), + revision["committer"]["fullname"], + swh_date_to_datetime(revision["committer_date"]), + swh_date_to_offset(revision["committer_date"]), + hash_to_hex_or_none(revision["directory"]), + ) + ) + + def process_directory(self, directory): + directory_writer = self.get_writer_for("directory") + directory_writer.write((hash_to_hex_or_none(directory["id"]),)) + + directory_entry_writer = self.get_writer_for("directory_entry") + for entry in directory["entries"]: + directory_entry_writer.write( + ( + hash_to_hex_or_none(directory["id"]), + entry["name"], + entry["type"], + hash_to_hex_or_none(entry["target"]), + entry["perms"], + ) + ) + + def process_content(self, content): + content_writer = self.get_writer_for("content") + content_writer.write( + ( + hash_to_hex_or_none(content["sha1"]), + hash_to_hex_or_none(content["sha1_git"]), + hash_to_hex_or_none(content["sha256"]), + hash_to_hex_or_none(content["blake2s256"]), + content["length"], + content["status"], + ) + ) + + def process_skipped_content(self, skipped_content): + skipped_content_writer = self.get_writer_for("skipped_content") + skipped_content_writer.write( + ( + hash_to_hex_or_none(skipped_content["sha1"]), + hash_to_hex_or_none(skipped_content["sha1_git"]), + hash_to_hex_or_none(skipped_content["sha256"]), + hash_to_hex_or_none(skipped_content["blake2s256"]), + skipped_content["length"], + skipped_content["status"], + skipped_content["reason"], + ) + ) diff --git a/swh/dataset/utils.py b/swh/dataset/utils.py --- a/swh/dataset/utils.py +++ b/swh/dataset/utils.py @@ -95,7 +95,7 @@ original_branch_name = branch_name while branch and branch.get("target_type") == "alias": branch_name = branch["target"] - branch = snapshot["branches"][branch_name] + branch = snapshot["branches"].get(branch_name) if branch is None or not branch_name: continue if branch_name.startswith(b"refs/") and not (