diff --git a/requirements-swh.txt b/requirements-swh.txt index 792bf2b..ffa0be8 100644 --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,5 +1,5 @@ # Add here internal Software Heritage dependencies, one per line. -swh.core[http] >= 0.3 +swh.core[http] >= 2 swh.journal -swh.model >= 1.0.0 +swh.model >= 4.3 swh.storage diff --git a/swh/dataset/exporters/orc.py b/swh/dataset/exporters/orc.py index 1fa483d..a4b1cd7 100644 --- a/swh/dataset/exporters/orc.py +++ b/swh/dataset/exporters/orc.py @@ -1,217 +1,262 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import datetime +from datetime import datetime +import math +from typing import Any, Optional, Tuple, Type, cast import uuid from pyorc import ( BigInt, Binary, CompressionKind, Int, SmallInt, String, Struct, Timestamp, + TypeKind, Writer, ) +from pyorc.converters import ORCConverter from swh.dataset.exporter import ExporterDispatch from swh.dataset.relational import TABLES from swh.dataset.utils import remove_pull_requests from swh.model.hashutil import hash_to_hex +from swh.model.model import TimestampWithTimezone ORC_TYPE_MAP = { "string": String, "smallint": SmallInt, "int": Int, "bigint": BigInt, "timestamp": Timestamp, "binary": Binary, } EXPORT_SCHEMA = { table_name: Struct( **{ column_name: ORC_TYPE_MAP[column_type]() for column_name, column_type in columns } ) for table_name, columns in TABLES.items() } def hash_to_hex_or_none(hash): return hash_to_hex(hash) if hash is not None else None -def swh_date_to_datetime(obj): +def swh_date_to_tuple(obj): if obj is None or obj["timestamp"] is None: - return None - return datetime.datetime( - 1970, 1, 1, tzinfo=datetime.timezone.utc - ) + datetime.timedelta( - seconds=obj["timestamp"]["seconds"], - microseconds=obj["timestamp"]["microseconds"], + return (None, None, None) + offset_bytes = obj.get("offset_bytes") + if offset_bytes is None: + offset = obj.get("offset", 0) + negative = offset < 0 or obj.get("negative_utc", False) + (hours, minutes) = divmod(abs(offset), 60) + offset_bytes = f"{'-' if negative else '+'}{hours:02}{minutes:02}".encode() + else: + offset = TimestampWithTimezone._parse_offset_bytes(offset_bytes) + return ( + (obj["timestamp"]["seconds"], obj["timestamp"]["microseconds"]), + offset, + offset_bytes, ) -def swh_date_to_offset(obj): +def datetime_to_tuple(obj: Optional[datetime]) -> Optional[Tuple[int, int]]: if obj is None: return None - return obj["offset"] + return (math.floor(obj.timestamp()), obj.microsecond) + + +class SWHTimestampConverter: + """This is an ORCConverter compatible class to convert timestamps from/to ORC files + + timestamps in python are given as a couple (seconds, microseconds) and are + serialized as a couple (seconds, nanoseconds) in the ORC file. + + Reimplemented because we do not want the Python object to be converted as + ORC timestamp to be Python datatime objects, since swh.model's Timestamp + cannot be converted without loss a Python datetime objects. + """ + + # use Any as timezone annotation to make it easier to run mypy on python < + # 3.9, plus we do not use the timezone argument here... + @staticmethod + def from_orc(seconds: int, nanoseconds: int, timezone: Any,) -> Tuple[int, int]: + return (seconds, nanoseconds // 1000) + + @staticmethod + def to_orc( + obj: Optional[Tuple[int, int]], timezone: Any, + ) -> Optional[Tuple[int, int]]: + if obj is None: + return None + return (obj[0], obj[1] * 1000 if obj[1] is not None else None) class ORCExporter(ExporterDispatch): """ Implementation of an exporter which writes the entire graph dataset as ORC files. Useful for large scale processing, notably on cloud instances (e.g BigQuery, Amazon Athena, Azure). """ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.writers = {} def get_writer_for(self, table_name: str): if table_name not in self.writers: object_type_dir = self.export_path / table_name object_type_dir.mkdir(exist_ok=True) unique_id = str(uuid.uuid4()) export_file = object_type_dir / ("graph-{}.orc".format(unique_id)) export_obj = self.exit_stack.enter_context(export_file.open("wb")) self.writers[table_name] = self.exit_stack.enter_context( Writer( export_obj, EXPORT_SCHEMA[table_name], compression=CompressionKind.ZSTD, + converters={ + TypeKind.TIMESTAMP: cast( + Type[ORCConverter], SWHTimestampConverter + ) + }, ) ) return self.writers[table_name] def process_origin(self, origin): origin_writer = self.get_writer_for("origin") origin_writer.write((origin["url"],)) def process_origin_visit(self, visit): origin_visit_writer = self.get_writer_for("origin_visit") origin_visit_writer.write( - (visit["origin"], visit["visit"], visit["date"], visit["type"],) + ( + visit["origin"], + visit["visit"], + datetime_to_tuple(visit["date"]), + visit["type"], + ) ) def process_origin_visit_status(self, visit_status): origin_visit_status_writer = self.get_writer_for("origin_visit_status") origin_visit_status_writer.write( ( visit_status["origin"], visit_status["visit"], - visit_status["date"], + datetime_to_tuple(visit_status["date"]), visit_status["status"], hash_to_hex_or_none(visit_status["snapshot"]), ) ) def process_snapshot(self, snapshot): if self.config.get("remove_pull_requests"): remove_pull_requests(snapshot) snapshot_writer = self.get_writer_for("snapshot") snapshot_writer.write((hash_to_hex_or_none(snapshot["id"]),)) snapshot_branch_writer = self.get_writer_for("snapshot_branch") for branch_name, branch in snapshot["branches"].items(): if branch is None: continue snapshot_branch_writer.write( ( hash_to_hex_or_none(snapshot["id"]), branch_name, hash_to_hex_or_none(branch["target"]), branch["target_type"], ) ) def process_release(self, release): release_writer = self.get_writer_for("release") release_writer.write( ( hash_to_hex_or_none(release["id"]), release["name"], release["message"], hash_to_hex_or_none(release["target"]), release["target_type"], (release.get("author") or {}).get("fullname"), - swh_date_to_datetime(release["date"]), - swh_date_to_offset(release["date"]), + *swh_date_to_tuple(release["date"]), ) ) def process_revision(self, revision): release_writer = self.get_writer_for("revision") release_writer.write( ( hash_to_hex_or_none(revision["id"]), revision["message"], revision["author"]["fullname"], - swh_date_to_datetime(revision["date"]), - swh_date_to_offset(revision["date"]), + *swh_date_to_tuple(revision["date"]), revision["committer"]["fullname"], - swh_date_to_datetime(revision["committer_date"]), - swh_date_to_offset(revision["committer_date"]), + *swh_date_to_tuple(revision["committer_date"]), hash_to_hex_or_none(revision["directory"]), ) ) revision_history_writer = self.get_writer_for("revision_history") for i, parent_id in enumerate(revision["parents"]): revision_history_writer.write( ( hash_to_hex_or_none(revision["id"]), hash_to_hex_or_none(parent_id), i, ) ) def process_directory(self, directory): directory_writer = self.get_writer_for("directory") directory_writer.write((hash_to_hex_or_none(directory["id"]),)) directory_entry_writer = self.get_writer_for("directory_entry") for entry in directory["entries"]: directory_entry_writer.write( ( hash_to_hex_or_none(directory["id"]), entry["name"], entry["type"], hash_to_hex_or_none(entry["target"]), entry["perms"], ) ) def process_content(self, content): content_writer = self.get_writer_for("content") content_writer.write( ( hash_to_hex_or_none(content["sha1"]), hash_to_hex_or_none(content["sha1_git"]), hash_to_hex_or_none(content["sha256"]), hash_to_hex_or_none(content["blake2s256"]), content["length"], content["status"], ) ) def process_skipped_content(self, skipped_content): skipped_content_writer = self.get_writer_for("skipped_content") skipped_content_writer.write( ( hash_to_hex_or_none(skipped_content["sha1"]), hash_to_hex_or_none(skipped_content["sha1_git"]), hash_to_hex_or_none(skipped_content["sha256"]), hash_to_hex_or_none(skipped_content["blake2s256"]), skipped_content["length"], skipped_content["status"], skipped_content["reason"], ) ) diff --git a/swh/dataset/relational.py b/swh/dataset/relational.py index c9f35b1..f814921 100644 --- a/swh/dataset/relational.py +++ b/swh/dataset/relational.py @@ -1,87 +1,90 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information # fmt: off TABLES = { "origin": [ ("url", "string"), ], "origin_visit": [ ("origin", "string"), ("visit", "bigint"), ("date", "timestamp"), ("type", "string"), ], "origin_visit_status": [ ("origin", "string"), ("visit", "bigint"), ("date", "timestamp"), ("status", "string"), ("snapshot", "string"), ], "snapshot": [ ("id", "string"), ], "snapshot_branch": [ ("snapshot_id", "string"), ("name", "binary"), ("target", "string"), ("target_type", "string"), ], "release": [ ("id", "string"), ("name", "binary"), ("message", "binary"), ("target", "string"), ("target_type", "string"), ("author", "binary"), ("date", "timestamp"), ("date_offset", "smallint"), + ("date_raw_offset_bytes", "binary"), ], "revision": [ ("id", "string"), ("message", "binary"), ("author", "binary"), ("date", "timestamp"), ("date_offset", "smallint"), + ("date_raw_offset_bytes", "binary"), ("committer", "binary"), ("committer_date", "timestamp"), ("committer_offset", "smallint"), + ("committer_date_raw_offset_bytes", "binary"), ("directory", "string"), ], "revision_history": [ ("id", "string"), ("parent_id", "string"), ("parent_rank", "int"), ], "directory": [ ("id", "string"), ], "directory_entry": [ ("directory_id", "string"), ("name", "binary"), ("type", "string"), ("target", "string"), ("perms", "int"), ], "content": [ ("sha1", "string"), ("sha1_git", "string"), ("sha256", "string"), ("blake2s256", "string"), ("length", "bigint"), ("status", "string"), ], "skipped_content": [ ("sha1", "string"), ("sha1_git", "string"), ("sha256", "string"), ("blake2s256", "string"), ("length", "bigint"), ("status", "string"), ("reason", "string"), ], } # fmt: on diff --git a/swh/dataset/test/test_orc.py b/swh/dataset/test/test_orc.py index cdbfbd1..261774d 100644 --- a/swh/dataset/test/test_orc.py +++ b/swh/dataset/test/test_orc.py @@ -1,161 +1,198 @@ import collections from pathlib import Path import tempfile import pyorc import pytest from swh.dataset.exporters.orc import ( ORCExporter, + SWHTimestampConverter, + datetime_to_tuple, hash_to_hex_or_none, - swh_date_to_datetime, - swh_date_to_offset, + swh_date_to_tuple, ) from swh.model.tests.swh_model_data import TEST_OBJECTS @pytest.fixture def exporter(): def wrapped(messages, config=None): with tempfile.TemporaryDirectory() as tmpname: tmppath = Path(tmpname) if config is None: config = {} with ORCExporter(config, tmppath) as exporter: for object_type, objects in messages.items(): for obj in objects: exporter.process_object(object_type, obj.to_dict()) res = collections.defaultdict(set) for obj_type_dir in tmppath.iterdir(): for orc_file in obj_type_dir.iterdir(): with orc_file.open("rb") as orc_obj: - res[obj_type_dir.name] |= set(pyorc.Reader(orc_obj)) + res[obj_type_dir.name] |= set( + pyorc.Reader( + orc_obj, + converters={ + pyorc.TypeKind.TIMESTAMP: SWHTimestampConverter + }, + ) + ) return res return wrapped def test_export_origin(exporter): obj_type = "origin" output = exporter({obj_type: TEST_OBJECTS[obj_type]}) for obj in TEST_OBJECTS[obj_type]: assert (obj.url,) in output[obj_type] def test_export_origin_visit(exporter): obj_type = "origin_visit" output = exporter({obj_type: TEST_OBJECTS[obj_type]}) for obj in TEST_OBJECTS[obj_type]: - assert (obj.origin, obj.visit, obj.date, obj.type) in output[obj_type] + assert (obj.origin, obj.visit, datetime_to_tuple(obj.date), obj.type) in output[ + obj_type + ] def test_export_origin_visit_status(exporter): obj_type = "origin_visit_status" output = exporter({obj_type: TEST_OBJECTS[obj_type]}) for obj in TEST_OBJECTS[obj_type]: assert ( obj.origin, obj.visit, - obj.date, + datetime_to_tuple(obj.date), obj.status, hash_to_hex_or_none(obj.snapshot), ) in output[obj_type] def test_export_snapshot(exporter): obj_type = "snapshot" output = exporter({obj_type: TEST_OBJECTS[obj_type]}) for obj in TEST_OBJECTS[obj_type]: assert (hash_to_hex_or_none(obj.id),) in output["snapshot"] for branch_name, branch in obj.branches.items(): if branch is None: continue assert ( hash_to_hex_or_none(obj.id), branch_name, hash_to_hex_or_none(branch.target), str(branch.target_type.value), ) in output["snapshot_branch"] def test_export_release(exporter): obj_type = "release" output = exporter({obj_type: TEST_OBJECTS[obj_type]}) for obj in TEST_OBJECTS[obj_type]: assert ( hash_to_hex_or_none(obj.id), obj.name, obj.message, hash_to_hex_or_none(obj.target), obj.target_type.value, obj.author.fullname if obj.author else None, - swh_date_to_datetime(obj.date.to_dict()) if obj.date else None, - swh_date_to_offset(obj.date.to_dict()) if obj.date else None, + *swh_date_to_tuple(obj.date.to_dict() if obj.date is not None else None), ) in output[obj_type] def test_export_revision(exporter): obj_type = "revision" output = exporter({obj_type: TEST_OBJECTS[obj_type]}) for obj in TEST_OBJECTS[obj_type]: assert ( hash_to_hex_or_none(obj.id), obj.message, obj.author.fullname, - swh_date_to_datetime(obj.date.to_dict()), - swh_date_to_offset(obj.date.to_dict()), + *swh_date_to_tuple(obj.date.to_dict() if obj.date is not None else None), obj.committer.fullname, - swh_date_to_datetime(obj.committer_date.to_dict()), - swh_date_to_offset(obj.committer_date.to_dict()), + *swh_date_to_tuple( + obj.committer_date.to_dict() if obj.committer_date is not None else None + ), hash_to_hex_or_none(obj.directory), ) in output["revision"] for i, parent in enumerate(obj.parents): assert ( hash_to_hex_or_none(obj.id), hash_to_hex_or_none(parent), i, ) in output["revision_history"] def test_export_directory(exporter): obj_type = "directory" output = exporter({obj_type: TEST_OBJECTS[obj_type]}) for obj in TEST_OBJECTS[obj_type]: assert (hash_to_hex_or_none(obj.id),) in output["directory"] for entry in obj.entries: assert ( hash_to_hex_or_none(obj.id), entry.name, entry.type, hash_to_hex_or_none(entry.target), entry.perms, ) in output["directory_entry"] def test_export_content(exporter): obj_type = "content" output = exporter({obj_type: TEST_OBJECTS[obj_type]}) for obj in TEST_OBJECTS[obj_type]: assert ( hash_to_hex_or_none(obj.sha1), hash_to_hex_or_none(obj.sha1_git), hash_to_hex_or_none(obj.sha256), hash_to_hex_or_none(obj.blake2s256), obj.length, obj.status, ) in output[obj_type] def test_export_skipped_content(exporter): obj_type = "skipped_content" output = exporter({obj_type: TEST_OBJECTS[obj_type]}) for obj in TEST_OBJECTS[obj_type]: assert ( hash_to_hex_or_none(obj.sha1), hash_to_hex_or_none(obj.sha1_git), hash_to_hex_or_none(obj.sha256), hash_to_hex_or_none(obj.blake2s256), obj.length, obj.status, obj.reason, ) in output[obj_type] + + +def test_date_to_tuple(): + ts = {"seconds": 123456, "microseconds": 1515} + assert swh_date_to_tuple({"timestamp": ts, "offset_bytes": b"+0100"}) == ( + (123456, 1515), + 60, + b"+0100", + ) + + assert swh_date_to_tuple( + { + "timestamp": ts, + "offset": 120, + "negative_utc": False, + "offset_bytes": b"+0100", + } + ) == ((123456, 1515), 60, b"+0100") + + assert swh_date_to_tuple( + {"timestamp": ts, "offset": 120, "negative_utc": False,} + ) == ((123456, 1515), 120, b"+0200") + + assert swh_date_to_tuple({"timestamp": ts, "offset": 0, "negative_utc": True,}) == ( + (123456, 1515), + 0, + b"-0000", + )