diff --git a/swh/dataset/exporters/orc.py b/swh/dataset/exporters/orc.py index 451ff26..a792d7d 100644 --- a/swh/dataset/exporters/orc.py +++ b/swh/dataset/exporters/orc.py @@ -1,193 +1,203 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import uuid from pyorc import BigInt, Binary, Int, SmallInt, String, Struct, Timestamp, Writer from swh.dataset.exporter import ExporterDispatch from swh.dataset.relational import TABLES from swh.dataset.utils import remove_pull_requests from swh.model.hashutil import hash_to_hex ORC_TYPE_MAP = { "string": String, "smallint": SmallInt, "int": Int, "bigint": BigInt, "timestamp": Timestamp, "binary": Binary, } EXPORT_SCHEMA = { table_name: Struct( **{ column_name: ORC_TYPE_MAP[column_type]() for column_name, column_type in columns } ) for table_name, columns in TABLES.items() } def hash_to_hex_or_none(hash): return hash_to_hex(hash) if hash is not None else None def swh_date_to_datetime(obj): if obj is None or obj["timestamp"] is None: return None return datetime.datetime( 1970, 1, 1, tzinfo=datetime.timezone.utc ) + datetime.timedelta( seconds=obj["timestamp"]["seconds"], microseconds=obj["timestamp"]["microseconds"], ) def swh_date_to_offset(obj): if obj is None: return None return obj["offset"] class ORCExporter(ExporterDispatch): """ Implementation of an exporter which writes the entire graph dataset as ORC files. Useful for large scale processing, notably on cloud instances (e.g BigQuery, Amazon Athena, Azure). """ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.writers = {} def get_writer_for(self, table_name: str): if table_name not in self.writers: object_type_dir = self.export_path / table_name object_type_dir.mkdir(exist_ok=True) unique_id = str(uuid.uuid4()) export_file = object_type_dir / ("graph-{}.orc".format(unique_id)) export_obj = self.exit_stack.enter_context(export_file.open("wb")) self.writers[table_name] = self.exit_stack.enter_context( Writer(export_obj, EXPORT_SCHEMA[table_name]) ) return self.writers[table_name] def process_origin(self, origin): origin_writer = self.get_writer_for("origin") origin_writer.write((origin["url"],)) def process_origin_visit(self, visit): origin_visit_writer = self.get_writer_for("origin_visit") origin_visit_writer.write( (visit["origin"], visit["visit"], visit["date"], visit["type"],) ) def process_origin_visit_status(self, visit_status): origin_visit_status_writer = self.get_writer_for("origin_visit_status") origin_visit_status_writer.write( ( visit_status["origin"], visit_status["visit"], visit_status["date"], visit_status["status"], hash_to_hex_or_none(visit_status["snapshot"]), ) ) def process_snapshot(self, snapshot): if self.config.get("remove_pull_requests"): remove_pull_requests(snapshot) snapshot_writer = self.get_writer_for("snapshot") snapshot_writer.write((hash_to_hex_or_none(snapshot["id"]),)) snapshot_branch_writer = self.get_writer_for("snapshot_branch") for branch_name, branch in snapshot["branches"].items(): if branch is None: continue snapshot_branch_writer.write( ( hash_to_hex_or_none(snapshot["id"]), branch_name, hash_to_hex_or_none(branch["target"]), branch["target_type"], ) ) def process_release(self, release): release_writer = self.get_writer_for("release") release_writer.write( ( hash_to_hex_or_none(release["id"]), release["name"], release["message"], hash_to_hex_or_none(release["target"]), release["target_type"], (release.get("author") or {}).get("fullname"), swh_date_to_datetime(release["date"]), swh_date_to_offset(release["date"]), ) ) def process_revision(self, revision): release_writer = self.get_writer_for("revision") release_writer.write( ( hash_to_hex_or_none(revision["id"]), revision["message"], revision["author"]["fullname"], swh_date_to_datetime(revision["date"]), swh_date_to_offset(revision["date"]), revision["committer"]["fullname"], swh_date_to_datetime(revision["committer_date"]), swh_date_to_offset(revision["committer_date"]), hash_to_hex_or_none(revision["directory"]), ) ) + revision_history_writer = self.get_writer_for("revision_history") + for i, parent_id in enumerate(revision["parents"]): + revision_history_writer.write( + ( + hash_to_hex_or_none(revision["id"]), + hash_to_hex_or_none(parent_id), + i, + ) + ) + def process_directory(self, directory): directory_writer = self.get_writer_for("directory") directory_writer.write((hash_to_hex_or_none(directory["id"]),)) directory_entry_writer = self.get_writer_for("directory_entry") for entry in directory["entries"]: directory_entry_writer.write( ( hash_to_hex_or_none(directory["id"]), entry["name"], entry["type"], hash_to_hex_or_none(entry["target"]), entry["perms"], ) ) def process_content(self, content): content_writer = self.get_writer_for("content") content_writer.write( ( hash_to_hex_or_none(content["sha1"]), hash_to_hex_or_none(content["sha1_git"]), hash_to_hex_or_none(content["sha256"]), hash_to_hex_or_none(content["blake2s256"]), content["length"], content["status"], ) ) def process_skipped_content(self, skipped_content): skipped_content_writer = self.get_writer_for("skipped_content") skipped_content_writer.write( ( hash_to_hex_or_none(skipped_content["sha1"]), hash_to_hex_or_none(skipped_content["sha1_git"]), hash_to_hex_or_none(skipped_content["sha256"]), hash_to_hex_or_none(skipped_content["blake2s256"]), skipped_content["length"], skipped_content["status"], skipped_content["reason"], ) ) diff --git a/swh/dataset/relational.py b/swh/dataset/relational.py index 614a134..c9f35b1 100644 --- a/swh/dataset/relational.py +++ b/swh/dataset/relational.py @@ -1,82 +1,87 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information # fmt: off TABLES = { "origin": [ ("url", "string"), ], "origin_visit": [ ("origin", "string"), ("visit", "bigint"), ("date", "timestamp"), ("type", "string"), ], "origin_visit_status": [ ("origin", "string"), ("visit", "bigint"), ("date", "timestamp"), ("status", "string"), ("snapshot", "string"), ], "snapshot": [ ("id", "string"), ], "snapshot_branch": [ ("snapshot_id", "string"), ("name", "binary"), ("target", "string"), ("target_type", "string"), ], "release": [ ("id", "string"), ("name", "binary"), ("message", "binary"), ("target", "string"), ("target_type", "string"), ("author", "binary"), ("date", "timestamp"), ("date_offset", "smallint"), ], "revision": [ ("id", "string"), ("message", "binary"), ("author", "binary"), ("date", "timestamp"), ("date_offset", "smallint"), ("committer", "binary"), ("committer_date", "timestamp"), ("committer_offset", "smallint"), ("directory", "string"), ], + "revision_history": [ + ("id", "string"), + ("parent_id", "string"), + ("parent_rank", "int"), + ], "directory": [ ("id", "string"), ], "directory_entry": [ ("directory_id", "string"), ("name", "binary"), ("type", "string"), ("target", "string"), ("perms", "int"), ], "content": [ ("sha1", "string"), ("sha1_git", "string"), ("sha256", "string"), ("blake2s256", "string"), ("length", "bigint"), ("status", "string"), ], "skipped_content": [ ("sha1", "string"), ("sha1_git", "string"), ("sha256", "string"), ("blake2s256", "string"), ("length", "bigint"), ("status", "string"), ("reason", "string"), ], } # fmt: on diff --git a/swh/dataset/test/test_orc.py b/swh/dataset/test/test_orc.py index 782900d..cdbfbd1 100644 --- a/swh/dataset/test/test_orc.py +++ b/swh/dataset/test/test_orc.py @@ -1,155 +1,161 @@ import collections from pathlib import Path import tempfile import pyorc import pytest from swh.dataset.exporters.orc import ( ORCExporter, hash_to_hex_or_none, swh_date_to_datetime, swh_date_to_offset, ) from swh.model.tests.swh_model_data import TEST_OBJECTS @pytest.fixture def exporter(): def wrapped(messages, config=None): with tempfile.TemporaryDirectory() as tmpname: tmppath = Path(tmpname) if config is None: config = {} with ORCExporter(config, tmppath) as exporter: for object_type, objects in messages.items(): for obj in objects: exporter.process_object(object_type, obj.to_dict()) res = collections.defaultdict(set) for obj_type_dir in tmppath.iterdir(): for orc_file in obj_type_dir.iterdir(): with orc_file.open("rb") as orc_obj: res[obj_type_dir.name] |= set(pyorc.Reader(orc_obj)) return res return wrapped def test_export_origin(exporter): obj_type = "origin" output = exporter({obj_type: TEST_OBJECTS[obj_type]}) for obj in TEST_OBJECTS[obj_type]: assert (obj.url,) in output[obj_type] def test_export_origin_visit(exporter): obj_type = "origin_visit" output = exporter({obj_type: TEST_OBJECTS[obj_type]}) for obj in TEST_OBJECTS[obj_type]: assert (obj.origin, obj.visit, obj.date, obj.type) in output[obj_type] def test_export_origin_visit_status(exporter): obj_type = "origin_visit_status" output = exporter({obj_type: TEST_OBJECTS[obj_type]}) for obj in TEST_OBJECTS[obj_type]: assert ( obj.origin, obj.visit, obj.date, obj.status, hash_to_hex_or_none(obj.snapshot), ) in output[obj_type] def test_export_snapshot(exporter): obj_type = "snapshot" output = exporter({obj_type: TEST_OBJECTS[obj_type]}) for obj in TEST_OBJECTS[obj_type]: assert (hash_to_hex_or_none(obj.id),) in output["snapshot"] for branch_name, branch in obj.branches.items(): if branch is None: continue assert ( hash_to_hex_or_none(obj.id), branch_name, hash_to_hex_or_none(branch.target), str(branch.target_type.value), ) in output["snapshot_branch"] def test_export_release(exporter): obj_type = "release" output = exporter({obj_type: TEST_OBJECTS[obj_type]}) for obj in TEST_OBJECTS[obj_type]: assert ( hash_to_hex_or_none(obj.id), obj.name, obj.message, hash_to_hex_or_none(obj.target), obj.target_type.value, obj.author.fullname if obj.author else None, swh_date_to_datetime(obj.date.to_dict()) if obj.date else None, swh_date_to_offset(obj.date.to_dict()) if obj.date else None, ) in output[obj_type] def test_export_revision(exporter): obj_type = "revision" output = exporter({obj_type: TEST_OBJECTS[obj_type]}) for obj in TEST_OBJECTS[obj_type]: assert ( hash_to_hex_or_none(obj.id), obj.message, obj.author.fullname, swh_date_to_datetime(obj.date.to_dict()), swh_date_to_offset(obj.date.to_dict()), obj.committer.fullname, swh_date_to_datetime(obj.committer_date.to_dict()), swh_date_to_offset(obj.committer_date.to_dict()), hash_to_hex_or_none(obj.directory), - ) in output[obj_type] + ) in output["revision"] + for i, parent in enumerate(obj.parents): + assert ( + hash_to_hex_or_none(obj.id), + hash_to_hex_or_none(parent), + i, + ) in output["revision_history"] def test_export_directory(exporter): obj_type = "directory" output = exporter({obj_type: TEST_OBJECTS[obj_type]}) for obj in TEST_OBJECTS[obj_type]: assert (hash_to_hex_or_none(obj.id),) in output["directory"] for entry in obj.entries: assert ( hash_to_hex_or_none(obj.id), entry.name, entry.type, hash_to_hex_or_none(entry.target), entry.perms, ) in output["directory_entry"] def test_export_content(exporter): obj_type = "content" output = exporter({obj_type: TEST_OBJECTS[obj_type]}) for obj in TEST_OBJECTS[obj_type]: assert ( hash_to_hex_or_none(obj.sha1), hash_to_hex_or_none(obj.sha1_git), hash_to_hex_or_none(obj.sha256), hash_to_hex_or_none(obj.blake2s256), obj.length, obj.status, ) in output[obj_type] def test_export_skipped_content(exporter): obj_type = "skipped_content" output = exporter({obj_type: TEST_OBJECTS[obj_type]}) for obj in TEST_OBJECTS[obj_type]: assert ( hash_to_hex_or_none(obj.sha1), hash_to_hex_or_none(obj.sha1_git), hash_to_hex_or_none(obj.sha256), hash_to_hex_or_none(obj.blake2s256), obj.length, obj.status, obj.reason, ) in output[obj_type]