diff --git a/swh/dataset/exporters/orc.py b/swh/dataset/exporters/orc.py index 609d161..a0d984c 100644 --- a/swh/dataset/exporters/orc.py +++ b/swh/dataset/exporters/orc.py @@ -1,260 +1,260 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import contextlib -from datetime import datetime +from datetime import datetime, timezone import pathlib import uuid from pyorc import BigInt, Binary, Int, SmallInt, String, Struct, Timestamp, Writer from swh.dataset.exporter import ExporterDispatch from swh.dataset.utils import remove_pull_requests from swh.model.hashutil import hash_to_hex # fmt: off EXPORT_SCHEMA = { 'origin': Struct( url=String() ), 'origin_visit': Struct( origin=String(), visit=BigInt(), date=Timestamp(), type=String(), ), 'origin_visit_status': Struct( origin=String(), visit=BigInt(), date=Timestamp(), status=String(), snapshot=String(), ), 'snapshot': Struct( id=String(), ), 'snapshot_branch': Struct( snapshot_id=String(), name=Binary(), target=String(), target_type=String(), ), 'release': Struct( id=String(), name=Binary(), message=Binary(), target=String(), target_type=String(), author=Binary(), date=Timestamp(), date_offset=SmallInt(), ), 'revision': Struct( id=String(), message=Binary(), author=Binary(), date=Timestamp(), date_offset=SmallInt(), committer=Binary(), committer_date=Timestamp(), committer_offset=SmallInt(), directory=String(), ), 'directory': Struct( id=String(), ), 'directory_entry': Struct( directory_id=String(), name=Binary(), type=String(), target=String(), perms=Int(), ), 'content': Struct( sha1=String(), sha1_git=String(), sha256=String(), blake2s256=String(), length=BigInt(), status=String(), ), 'skipped_content': Struct( sha1=String(), sha1_git=String(), sha256=String(), blake2s256=String(), length=BigInt(), status=String(), reason=String(), ), } # fmt: on def hash_to_hex_or_none(hash): return hash_to_hex(hash) if hash is not None else None def swh_date_to_datetime(obj): if obj is None or obj["timestamp"] is None: return None return datetime.fromtimestamp( obj["timestamp"]["seconds"] + (obj["timestamp"]["microseconds"] / 1e6) - ) + ).replace(tzinfo=timezone.utc) def swh_date_to_offset(obj): if obj is None: return None return obj["offset"] class ORCExporter(ExporterDispatch): """ Implementation of an exporter which writes the entire graph dataset as ORC files. Useful for large scale processing, notably on cloud instances (e.g BigQuery, Amazon Athena, Azure). """ def __init__(self, config, export_path, **kwargs): super().__init__(config) self.export_path = pathlib.Path(export_path) self.export_path.mkdir(exist_ok=True, parents=True) self.writers = {} self.exit_stack = contextlib.ExitStack() def __enter__(self): self.exit_stack.__enter__() return self def __exit__(self, exc_type, exc_value, traceback): self.exit_stack.__exit__(exc_type, exc_value, traceback) def get_writer_for(self, table_name: str): if table_name not in self.writers: object_type_dir = self.export_path / table_name object_type_dir.mkdir(exist_ok=True) unique_id = str(uuid.uuid4()) export_file = object_type_dir / ("graph-{}.orc".format(unique_id)) export_obj = self.exit_stack.enter_context(export_file.open("wb")) self.writers[table_name] = self.exit_stack.enter_context( Writer(export_obj, EXPORT_SCHEMA[table_name]) ) return self.writers[table_name] def process_origin(self, origin): origin_writer = self.get_writer_for("origin") origin_writer.write((origin["url"],)) def process_origin_visit(self, visit): origin_visit_writer = self.get_writer_for("origin_visit") origin_visit_writer.write( (visit["origin"], visit["visit"], visit["date"], visit["type"],) ) def process_origin_visit_status(self, visit_status): origin_visit_status_writer = self.get_writer_for("origin_visit_status") origin_visit_status_writer.write( ( visit_status["origin"], visit_status["visit"], visit_status["date"], visit_status["status"], hash_to_hex_or_none(visit_status["snapshot"]), ) ) def process_snapshot(self, snapshot): if self.config.get("remove_pull_requests"): remove_pull_requests(snapshot) snapshot_writer = self.get_writer_for("snapshot") snapshot_writer.write((hash_to_hex_or_none(snapshot["id"]),)) snapshot_branch_writer = self.get_writer_for("snapshot_branch") for branch_name, branch in snapshot["branches"].items(): if branch is None: continue snapshot_branch_writer.write( ( hash_to_hex_or_none(snapshot["id"]), branch_name, hash_to_hex_or_none(branch["target"]), branch["target_type"], ) ) def process_release(self, release): release_writer = self.get_writer_for("release") release_writer.write( ( hash_to_hex_or_none(release["id"]), release["name"], release["message"], hash_to_hex_or_none(release["target"]), release["target_type"], release["author"]["fullname"], swh_date_to_datetime(release["date"]), swh_date_to_offset(release["date"]), ) ) def process_revision(self, revision): release_writer = self.get_writer_for("revision") release_writer.write( ( hash_to_hex_or_none(revision["id"]), revision["message"], revision["author"]["fullname"], swh_date_to_datetime(revision["date"]), swh_date_to_offset(revision["date"]), revision["committer"]["fullname"], swh_date_to_datetime(revision["committer_date"]), swh_date_to_offset(revision["committer_date"]), hash_to_hex_or_none(revision["directory"]), ) ) def process_directory(self, directory): directory_writer = self.get_writer_for("directory") directory_writer.write((hash_to_hex_or_none(directory["id"]),)) directory_entry_writer = self.get_writer_for("directory_entry") for entry in directory["entries"]: directory_entry_writer.write( ( hash_to_hex_or_none(directory["id"]), entry["name"], entry["type"], hash_to_hex_or_none(entry["target"]), entry["perms"], ) ) def process_content(self, content): content_writer = self.get_writer_for("content") content_writer.write( ( hash_to_hex_or_none(content["sha1"]), hash_to_hex_or_none(content["sha1_git"]), hash_to_hex_or_none(content["sha256"]), hash_to_hex_or_none(content["blake2s256"]), content["length"], content["status"], ) ) def process_skipped_content(self, skipped_content): skipped_content_writer = self.get_writer_for("skipped_content") skipped_content_writer.write( ( hash_to_hex_or_none(skipped_content["sha1"]), hash_to_hex_or_none(skipped_content["sha1_git"]), hash_to_hex_or_none(skipped_content["sha256"]), hash_to_hex_or_none(skipped_content["blake2s256"]), skipped_content["length"], skipped_content["status"], skipped_content["reason"], ) ) diff --git a/swh/dataset/test/test_orc.py b/swh/dataset/test/test_orc.py new file mode 100644 index 0000000..ef89b97 --- /dev/null +++ b/swh/dataset/test/test_orc.py @@ -0,0 +1,155 @@ +import collections +from pathlib import Path +import tempfile + +import pyorc +import pytest + +from swh.dataset.exporters.orc import ( + ORCExporter, + hash_to_hex_or_none, + swh_date_to_datetime, + swh_date_to_offset, +) +from swh.model.tests.swh_model_data import TEST_OBJECTS + + +@pytest.fixture +def exporter(): + def wrapped(messages, config=None): + with tempfile.TemporaryDirectory() as tmpname: + tmppath = Path(tmpname) + if config is None: + config = {} + with ORCExporter(config, tmppath) as exporter: + for object_type, objects in messages.items(): + for obj in objects: + exporter.process_object(object_type, obj.to_dict()) + res = collections.defaultdict(set) + for obj_type_dir in tmppath.iterdir(): + for orc_file in obj_type_dir.iterdir(): + with orc_file.open("rb") as orc_obj: + res[obj_type_dir.name] |= set(pyorc.Reader(orc_obj)) + return res + + return wrapped + + +def test_export_origin(exporter): + obj_type = "origin" + output = exporter({obj_type: TEST_OBJECTS[obj_type]}) + for obj in TEST_OBJECTS[obj_type]: + assert (obj.url,) in output[obj_type] + + +def test_export_origin_visit(exporter): + obj_type = "origin_visit" + output = exporter({obj_type: TEST_OBJECTS[obj_type]}) + for obj in TEST_OBJECTS[obj_type]: + assert (obj.origin, obj.visit, obj.date, obj.type) in output[obj_type] + + +def test_export_origin_visit_status(exporter): + obj_type = "origin_visit_status" + output = exporter({obj_type: TEST_OBJECTS[obj_type]}) + for obj in TEST_OBJECTS[obj_type]: + assert ( + obj.origin, + obj.visit, + obj.date, + obj.status, + hash_to_hex_or_none(obj.snapshot), + ) in output[obj_type] + + +def test_export_snapshot(exporter): + obj_type = "snapshot" + output = exporter({obj_type: TEST_OBJECTS[obj_type]}) + for obj in TEST_OBJECTS[obj_type]: + assert (hash_to_hex_or_none(obj.id),) in output["snapshot"] + for branch_name, branch in obj.branches.items(): + if branch is None: + continue + assert ( + hash_to_hex_or_none(obj.id), + branch_name, + hash_to_hex_or_none(branch.target), + str(branch.target_type.value), + ) in output["snapshot_branch"] + + +def test_export_release(exporter): + obj_type = "release" + output = exporter({obj_type: TEST_OBJECTS[obj_type]}) + for obj in TEST_OBJECTS[obj_type]: + assert ( + hash_to_hex_or_none(obj.id), + obj.name, + obj.message, + hash_to_hex_or_none(obj.target), + obj.target_type.value, + obj.author.fullname, + swh_date_to_datetime(obj.date.to_dict()), + swh_date_to_offset(obj.date.to_dict()), + ) in output[obj_type] + + +def test_export_revision(exporter): + obj_type = "revision" + output = exporter({obj_type: TEST_OBJECTS[obj_type]}) + for obj in TEST_OBJECTS[obj_type]: + assert ( + hash_to_hex_or_none(obj.id), + obj.message, + obj.author.fullname, + swh_date_to_datetime(obj.date.to_dict()), + swh_date_to_offset(obj.date.to_dict()), + obj.committer.fullname, + swh_date_to_datetime(obj.committer_date.to_dict()), + swh_date_to_offset(obj.committer_date.to_dict()), + hash_to_hex_or_none(obj.directory), + ) in output[obj_type] + + +def test_export_directory(exporter): + obj_type = "directory" + output = exporter({obj_type: TEST_OBJECTS[obj_type]}) + for obj in TEST_OBJECTS[obj_type]: + assert (hash_to_hex_or_none(obj.id),) in output["directory"] + for entry in obj.entries: + assert ( + hash_to_hex_or_none(obj.id), + entry.name, + entry.type, + hash_to_hex_or_none(entry.target), + entry.perms, + ) in output["directory_entry"] + + +def test_export_content(exporter): + obj_type = "content" + output = exporter({obj_type: TEST_OBJECTS[obj_type]}) + for obj in TEST_OBJECTS[obj_type]: + assert ( + hash_to_hex_or_none(obj.sha1), + hash_to_hex_or_none(obj.sha1_git), + hash_to_hex_or_none(obj.sha256), + hash_to_hex_or_none(obj.blake2s256), + obj.length, + obj.status, + ) in output[obj_type] + + +def test_export_skipped_content(exporter): + obj_type = "skipped_content" + output = exporter({obj_type: TEST_OBJECTS[obj_type]}) + for obj in TEST_OBJECTS[obj_type]: + assert ( + hash_to_hex_or_none(obj.sha1), + hash_to_hex_or_none(obj.sha1_git), + hash_to_hex_or_none(obj.sha256), + hash_to_hex_or_none(obj.blake2s256), + obj.length, + obj.status, + obj.reason, + ) in output[obj_type]