diff --git a/mypy.ini b/mypy.ini --- a/mypy.ini +++ b/mypy.ini @@ -17,5 +17,8 @@ [mypy-confluent_kafka.*] ignore_missing_imports = True +[mypy-pyorc.*] +ignore_missing_imports = True + # [mypy-add_your_lib_here.*] # ignore_missing_imports = True diff --git a/requirements.txt b/requirements.txt --- a/requirements.txt +++ b/requirements.txt @@ -3,3 +3,4 @@ # dependency lines, see https://pip.readthedocs.org/en/1.1/requirements.html click tqdm +pyorc diff --git a/swh/dataset/cli.py b/swh/dataset/cli.py --- a/swh/dataset/cli.py +++ b/swh/dataset/cli.py @@ -13,6 +13,7 @@ from swh.core.cli import CONTEXT_SETTINGS from swh.core.cli import swh as swh_cli_group from swh.dataset.exporters.edges import GraphEdgesExporter +from swh.dataset.exporters.orc import ORCExporter from swh.dataset.journalprocessor import ParallelJournalProcessor @@ -43,6 +44,7 @@ AVAILABLE_EXPORTERS = { "edges": GraphEdgesExporter, + "orc": ORCExporter, } @@ -96,10 +98,7 @@ if obj_type in exclude_obj_types: continue exporters = [ - ( - AVAILABLE_EXPORTERS[f], - {"export_path": os.path.join(export_path, f, obj_type)}, - ) + (AVAILABLE_EXPORTERS[f], {"export_path": os.path.join(export_path, f)},) for f in export_formats ] parallel_exporter = ParallelJournalProcessor( diff --git a/swh/dataset/exporter.py b/swh/dataset/exporter.py --- a/swh/dataset/exporter.py +++ b/swh/dataset/exporter.py @@ -3,6 +3,8 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import contextlib +import pathlib from types import TracebackType from typing import Any, Dict, Optional, Type @@ -23,10 +25,16 @@ will be called automatically. """ - def __init__(self, config: Dict[str, Any], *args: Any, **kwargs: Any) -> None: + def __init__( + self, config: Dict[str, Any], export_path, *args: Any, **kwargs: Any + ) -> None: self.config: Dict[str, Any] = config + self.export_path = pathlib.Path(export_path) + self.exit_stack = contextlib.ExitStack() def __enter__(self) -> "Exporter": + self.export_path.mkdir(exist_ok=True, parents=True) + self.exit_stack.__enter__() return self def __exit__( @@ -35,7 +43,7 @@ exc_value: Optional[BaseException], traceback: Optional[TracebackType], ) -> Optional[bool]: - pass + return self.exit_stack.__exit__(exc_type, exc_value, traceback) def process_object(self, object_type: str, obj: Dict[str, Any]) -> None: """ diff --git a/swh/dataset/exporters/edges.py b/swh/dataset/exporters/edges.py --- a/swh/dataset/exporters/edges.py +++ b/swh/dataset/exporters/edges.py @@ -6,7 +6,6 @@ import base64 import os import os.path -import pathlib import shlex import subprocess import tempfile @@ -25,32 +24,35 @@ Each row of the CSV is in the format: ` """ - def __init__(self, config, export_path, **kwargs): - super().__init__(config) - self.export_path = export_path - - def __enter__(self): - dataset_path = pathlib.Path(self.export_path) - dataset_path.mkdir(exist_ok=True, parents=True) - unique_id = str(uuid.uuid4()) - nodes_file = dataset_path / ("graph-{}.nodes.csv.zst".format(unique_id)) - edges_file = dataset_path / ("graph-{}.edges.csv.zst".format(unique_id)) - self.node_writer = ZSTFile(nodes_file, "w") - self.edge_writer = ZSTFile(edges_file, "w") - self.node_writer.__enter__() - self.edge_writer.__enter__() - return self - - def __exit__(self, exc_type, exc_value, traceback): - self.node_writer.__exit__(exc_type, exc_value, traceback) - self.edge_writer.__exit__(exc_type, exc_value, traceback) + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.writers = {} + + def get_writers_for(self, obj_type: str): + if obj_type not in self.writers: + dataset_path = self.export_path / obj_type + dataset_path.mkdir(exist_ok=True) + unique_id = str(uuid.uuid4()) + nodes_file = dataset_path / ("graph-{}.nodes.csv.zst".format(unique_id)) + edges_file = dataset_path / ("graph-{}.edges.csv.zst".format(unique_id)) + node_writer = self.exit_stack.enter_context(ZSTFile(str(nodes_file), "w")) + edge_writer = self.exit_stack.enter_context(ZSTFile(str(edges_file), "w")) + self.writers[obj_type] = (node_writer, edge_writer) + return self.writers[obj_type] + + def get_node_writer_for(self, obj_type: str): + return self.get_writers_for(obj_type)[0] + + def get_edge_writer_for(self, obj_type: str): + return self.get_writers_for(obj_type)[1] def write_node(self, node): node_type, node_id = node if node_id is None: return node_swhid = swhid(object_type=node_type, object_id=node_id) - self.node_writer.write("{}\n".format(node_swhid)) + node_writer = self.get_node_writer_for(node_type) + node_writer.write("{}\n".format(node_swhid)) def write_edge(self, src, dst, *, labels=None): src_type, src_id = src @@ -60,7 +62,8 @@ src_swhid = swhid(object_type=src_type, object_id=src_id) dst_swhid = swhid(object_type=dst_type, object_id=dst_id) edge_line = " ".join([src_swhid, dst_swhid] + (labels if labels else [])) - self.edge_writer.write("{}\n".format(edge_line)) + edge_writer = self.get_edge_writer_for(src_type) + edge_writer.write("{}\n".format(edge_line)) def process_origin(self, origin): origin_id = origin_identifier({"url": origin["url"]}) diff --git a/swh/dataset/exporters/orc.py b/swh/dataset/exporters/orc.py new file mode 100644 --- /dev/null +++ b/swh/dataset/exporters/orc.py @@ -0,0 +1,251 @@ +# Copyright (C) 2020 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import datetime +import uuid + +from pyorc import BigInt, Binary, Int, SmallInt, String, Struct, Timestamp, Writer + +from swh.dataset.exporter import ExporterDispatch +from swh.dataset.utils import remove_pull_requests +from swh.model.hashutil import hash_to_hex + +# fmt: off +EXPORT_SCHEMA = { + 'origin': Struct( + url=String() + ), + 'origin_visit': Struct( + origin=String(), + visit=BigInt(), + date=Timestamp(), + type=String(), + ), + 'origin_visit_status': Struct( + origin=String(), + visit=BigInt(), + date=Timestamp(), + status=String(), + snapshot=String(), + ), + 'snapshot': Struct( + id=String(), + ), + 'snapshot_branch': Struct( + snapshot_id=String(), + name=Binary(), + target=String(), + target_type=String(), + ), + 'release': Struct( + id=String(), + name=Binary(), + message=Binary(), + target=String(), + target_type=String(), + author=Binary(), + date=Timestamp(), + date_offset=SmallInt(), + ), + 'revision': Struct( + id=String(), + message=Binary(), + author=Binary(), + date=Timestamp(), + date_offset=SmallInt(), + committer=Binary(), + committer_date=Timestamp(), + committer_offset=SmallInt(), + directory=String(), + ), + 'directory': Struct( + id=String(), + ), + 'directory_entry': Struct( + directory_id=String(), + name=Binary(), + type=String(), + target=String(), + perms=Int(), + ), + 'content': Struct( + sha1=String(), + sha1_git=String(), + sha256=String(), + blake2s256=String(), + length=BigInt(), + status=String(), + ), + 'skipped_content': Struct( + sha1=String(), + sha1_git=String(), + sha256=String(), + blake2s256=String(), + length=BigInt(), + status=String(), + reason=String(), + ), +} +# fmt: on + + +def hash_to_hex_or_none(hash): + return hash_to_hex(hash) if hash is not None else None + + +def swh_date_to_datetime(obj): + if obj is None or obj["timestamp"] is None: + return None + return datetime.datetime( + 1970, 1, 1, tzinfo=datetime.timezone.utc + ) + datetime.timedelta( + seconds=obj["timestamp"]["seconds"], + microseconds=obj["timestamp"]["microseconds"], + ) + + +def swh_date_to_offset(obj): + if obj is None: + return None + return obj["offset"] + + +class ORCExporter(ExporterDispatch): + """ + Implementation of an exporter which writes the entire graph dataset as + ORC files. Useful for large scale processing, notably on cloud instances + (e.g BigQuery, Amazon Athena, Azure). + """ + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.writers = {} + + def get_writer_for(self, table_name: str): + if table_name not in self.writers: + object_type_dir = self.export_path / table_name + object_type_dir.mkdir(exist_ok=True) + unique_id = str(uuid.uuid4()) + export_file = object_type_dir / ("graph-{}.orc".format(unique_id)) + export_obj = self.exit_stack.enter_context(export_file.open("wb")) + self.writers[table_name] = self.exit_stack.enter_context( + Writer(export_obj, EXPORT_SCHEMA[table_name]) + ) + return self.writers[table_name] + + def process_origin(self, origin): + origin_writer = self.get_writer_for("origin") + origin_writer.write((origin["url"],)) + + def process_origin_visit(self, visit): + origin_visit_writer = self.get_writer_for("origin_visit") + origin_visit_writer.write( + (visit["origin"], visit["visit"], visit["date"], visit["type"],) + ) + + def process_origin_visit_status(self, visit_status): + origin_visit_status_writer = self.get_writer_for("origin_visit_status") + origin_visit_status_writer.write( + ( + visit_status["origin"], + visit_status["visit"], + visit_status["date"], + visit_status["status"], + hash_to_hex_or_none(visit_status["snapshot"]), + ) + ) + + def process_snapshot(self, snapshot): + if self.config.get("remove_pull_requests"): + remove_pull_requests(snapshot) + snapshot_writer = self.get_writer_for("snapshot") + snapshot_writer.write((hash_to_hex_or_none(snapshot["id"]),)) + + snapshot_branch_writer = self.get_writer_for("snapshot_branch") + for branch_name, branch in snapshot["branches"].items(): + if branch is None: + continue + snapshot_branch_writer.write( + ( + hash_to_hex_or_none(snapshot["id"]), + branch_name, + hash_to_hex_or_none(branch["target"]), + branch["target_type"], + ) + ) + + def process_release(self, release): + release_writer = self.get_writer_for("release") + release_writer.write( + ( + hash_to_hex_or_none(release["id"]), + release["name"], + release["message"], + hash_to_hex_or_none(release["target"]), + release["target_type"], + release["author"]["fullname"], + swh_date_to_datetime(release["date"]), + swh_date_to_offset(release["date"]), + ) + ) + + def process_revision(self, revision): + release_writer = self.get_writer_for("revision") + release_writer.write( + ( + hash_to_hex_or_none(revision["id"]), + revision["message"], + revision["author"]["fullname"], + swh_date_to_datetime(revision["date"]), + swh_date_to_offset(revision["date"]), + revision["committer"]["fullname"], + swh_date_to_datetime(revision["committer_date"]), + swh_date_to_offset(revision["committer_date"]), + hash_to_hex_or_none(revision["directory"]), + ) + ) + + def process_directory(self, directory): + directory_writer = self.get_writer_for("directory") + directory_writer.write((hash_to_hex_or_none(directory["id"]),)) + + directory_entry_writer = self.get_writer_for("directory_entry") + for entry in directory["entries"]: + directory_entry_writer.write( + ( + hash_to_hex_or_none(directory["id"]), + entry["name"], + entry["type"], + hash_to_hex_or_none(entry["target"]), + entry["perms"], + ) + ) + + def process_content(self, content): + content_writer = self.get_writer_for("content") + content_writer.write( + ( + hash_to_hex_or_none(content["sha1"]), + hash_to_hex_or_none(content["sha1_git"]), + hash_to_hex_or_none(content["sha256"]), + hash_to_hex_or_none(content["blake2s256"]), + content["length"], + content["status"], + ) + ) + + def process_skipped_content(self, skipped_content): + skipped_content_writer = self.get_writer_for("skipped_content") + skipped_content_writer.write( + ( + hash_to_hex_or_none(skipped_content["sha1"]), + hash_to_hex_or_none(skipped_content["sha1_git"]), + hash_to_hex_or_none(skipped_content["sha256"]), + hash_to_hex_or_none(skipped_content["blake2s256"]), + skipped_content["length"], + skipped_content["status"], + skipped_content["reason"], + ) + ) diff --git a/swh/dataset/test/test_edges.py b/swh/dataset/test/test_edges.py --- a/swh/dataset/test/test_edges.py +++ b/swh/dataset/test/test_edges.py @@ -90,12 +90,13 @@ if config is None: config = {} exporter = GraphEdgesExporter(config, "/dummy_path") - exporter.node_writer = Mock() - exporter.edge_writer = Mock() + node_writer = Mock() + edge_writer = Mock() + exporter.get_writers_for = lambda *a, **k: (node_writer, edge_writer) for object_type, objects in messages.items(): for obj in objects: exporter.process_object(object_type, obj) - return exporter.node_writer.write, exporter.edge_writer.write + return node_writer.write, edge_writer.write return wrapped diff --git a/swh/dataset/test/test_orc.py b/swh/dataset/test/test_orc.py new file mode 100644 --- /dev/null +++ b/swh/dataset/test/test_orc.py @@ -0,0 +1,155 @@ +import collections +from pathlib import Path +import tempfile + +import pyorc +import pytest + +from swh.dataset.exporters.orc import ( + ORCExporter, + hash_to_hex_or_none, + swh_date_to_datetime, + swh_date_to_offset, +) +from swh.model.tests.swh_model_data import TEST_OBJECTS + + +@pytest.fixture +def exporter(): + def wrapped(messages, config=None): + with tempfile.TemporaryDirectory() as tmpname: + tmppath = Path(tmpname) + if config is None: + config = {} + with ORCExporter(config, tmppath) as exporter: + for object_type, objects in messages.items(): + for obj in objects: + exporter.process_object(object_type, obj.to_dict()) + res = collections.defaultdict(set) + for obj_type_dir in tmppath.iterdir(): + for orc_file in obj_type_dir.iterdir(): + with orc_file.open("rb") as orc_obj: + res[obj_type_dir.name] |= set(pyorc.Reader(orc_obj)) + return res + + return wrapped + + +def test_export_origin(exporter): + obj_type = "origin" + output = exporter({obj_type: TEST_OBJECTS[obj_type]}) + for obj in TEST_OBJECTS[obj_type]: + assert (obj.url,) in output[obj_type] + + +def test_export_origin_visit(exporter): + obj_type = "origin_visit" + output = exporter({obj_type: TEST_OBJECTS[obj_type]}) + for obj in TEST_OBJECTS[obj_type]: + assert (obj.origin, obj.visit, obj.date, obj.type) in output[obj_type] + + +def test_export_origin_visit_status(exporter): + obj_type = "origin_visit_status" + output = exporter({obj_type: TEST_OBJECTS[obj_type]}) + for obj in TEST_OBJECTS[obj_type]: + assert ( + obj.origin, + obj.visit, + obj.date, + obj.status, + hash_to_hex_or_none(obj.snapshot), + ) in output[obj_type] + + +def test_export_snapshot(exporter): + obj_type = "snapshot" + output = exporter({obj_type: TEST_OBJECTS[obj_type]}) + for obj in TEST_OBJECTS[obj_type]: + assert (hash_to_hex_or_none(obj.id),) in output["snapshot"] + for branch_name, branch in obj.branches.items(): + if branch is None: + continue + assert ( + hash_to_hex_or_none(obj.id), + branch_name, + hash_to_hex_or_none(branch.target), + str(branch.target_type.value), + ) in output["snapshot_branch"] + + +def test_export_release(exporter): + obj_type = "release" + output = exporter({obj_type: TEST_OBJECTS[obj_type]}) + for obj in TEST_OBJECTS[obj_type]: + assert ( + hash_to_hex_or_none(obj.id), + obj.name, + obj.message, + hash_to_hex_or_none(obj.target), + obj.target_type.value, + obj.author.fullname, + swh_date_to_datetime(obj.date.to_dict()), + swh_date_to_offset(obj.date.to_dict()), + ) in output[obj_type] + + +def test_export_revision(exporter): + obj_type = "revision" + output = exporter({obj_type: TEST_OBJECTS[obj_type]}) + for obj in TEST_OBJECTS[obj_type]: + assert ( + hash_to_hex_or_none(obj.id), + obj.message, + obj.author.fullname, + swh_date_to_datetime(obj.date.to_dict()), + swh_date_to_offset(obj.date.to_dict()), + obj.committer.fullname, + swh_date_to_datetime(obj.committer_date.to_dict()), + swh_date_to_offset(obj.committer_date.to_dict()), + hash_to_hex_or_none(obj.directory), + ) in output[obj_type] + + +def test_export_directory(exporter): + obj_type = "directory" + output = exporter({obj_type: TEST_OBJECTS[obj_type]}) + for obj in TEST_OBJECTS[obj_type]: + assert (hash_to_hex_or_none(obj.id),) in output["directory"] + for entry in obj.entries: + assert ( + hash_to_hex_or_none(obj.id), + entry.name, + entry.type, + hash_to_hex_or_none(entry.target), + entry.perms, + ) in output["directory_entry"] + + +def test_export_content(exporter): + obj_type = "content" + output = exporter({obj_type: TEST_OBJECTS[obj_type]}) + for obj in TEST_OBJECTS[obj_type]: + assert ( + hash_to_hex_or_none(obj.sha1), + hash_to_hex_or_none(obj.sha1_git), + hash_to_hex_or_none(obj.sha256), + hash_to_hex_or_none(obj.blake2s256), + obj.length, + obj.status, + ) in output[obj_type] + + +def test_export_skipped_content(exporter): + obj_type = "skipped_content" + output = exporter({obj_type: TEST_OBJECTS[obj_type]}) + for obj in TEST_OBJECTS[obj_type]: + assert ( + hash_to_hex_or_none(obj.sha1), + hash_to_hex_or_none(obj.sha1_git), + hash_to_hex_or_none(obj.sha256), + hash_to_hex_or_none(obj.blake2s256), + obj.length, + obj.status, + obj.reason, + ) in output[obj_type] diff --git a/swh/dataset/utils.py b/swh/dataset/utils.py --- a/swh/dataset/utils.py +++ b/swh/dataset/utils.py @@ -95,7 +95,7 @@ original_branch_name = branch_name while branch and branch.get("target_type") == "alias": branch_name = branch["target"] - branch = snapshot["branches"][branch_name] + branch = snapshot["branches"].get(branch_name) if branch is None or not branch_name: continue if branch_name.startswith(b"refs/") and not (