Changeset View
Changeset View
Standalone View
Standalone View
swh/dataset/exporters/orc.py
- This file was added.
# Copyright (C) 2020 The Software Heritage developers | |||||
# See the AUTHORS file at the top-level directory of this distribution | |||||
# License: GNU General Public License version 3, or any later version | |||||
# See top-level LICENSE file for more information | |||||
import contextlib | |||||
from datetime import datetime | |||||
import pathlib | |||||
import uuid | |||||
from pyorc import BigInt, Binary, Int, SmallInt, String, Struct, Timestamp, Writer | |||||
from swh.dataset.exporter import ExporterDispatch | |||||
from swh.dataset.utils import remove_pull_requests | |||||
from swh.model.hashutil import hash_to_hex | |||||
# fmt: off | |||||
EXPORT_SCHEMA = { | |||||
'origin': Struct( | |||||
url=String() | |||||
), | |||||
'origin_visit': Struct( | |||||
origin=String(), | |||||
visit=BigInt(), | |||||
date=Timestamp(), | |||||
type=String(), | |||||
), | |||||
'origin_visit_status': Struct( | |||||
origin=String(), | |||||
visit=BigInt(), | |||||
date=Timestamp(), | |||||
status=String(), | |||||
snapshot=String(), | |||||
), | |||||
'snapshot': Struct( | |||||
id=String(), | |||||
), | |||||
'snapshot_branch': Struct( | |||||
snapshot_id=String(), | |||||
name=Binary(), | |||||
target=String(), | |||||
target_type=String(), | |||||
), | |||||
'release': Struct( | |||||
id=String(), | |||||
name=Binary(), | |||||
message=Binary(), | |||||
target=String(), | |||||
target_type=String(), | |||||
author=Binary(), | |||||
date=Timestamp(), | |||||
date_offset=SmallInt(), | |||||
), | |||||
'revision': Struct( | |||||
id=String(), | |||||
message=Binary(), | |||||
author=Binary(), | |||||
date=Timestamp(), | |||||
date_offset=SmallInt(), | |||||
committer=Binary(), | |||||
committer_date=Timestamp(), | |||||
committer_offset=SmallInt(), | |||||
directory=String(), | |||||
), | |||||
'directory': Struct( | |||||
id=String(), | |||||
), | |||||
'directory_entry': Struct( | |||||
directory_id=String(), | |||||
name=Binary(), | |||||
type=String(), | |||||
target=String(), | |||||
perms=Int(), | |||||
), | |||||
'content': Struct( | |||||
sha1=String(), | |||||
sha1_git=String(), | |||||
sha256=String(), | |||||
blake2s256=String(), | |||||
length=BigInt(), | |||||
status=String(), | |||||
), | |||||
'skipped_content': Struct( | |||||
sha1=String(), | |||||
sha1_git=String(), | |||||
sha256=String(), | |||||
blake2s256=String(), | |||||
length=BigInt(), | |||||
status=String(), | |||||
reason=String(), | |||||
), | |||||
} | |||||
# fmt: on | |||||
def hash_to_hex_or_none(hash): | |||||
return hash_to_hex(hash) if hash is not None else None | |||||
def swh_date_to_datetime(obj): | |||||
if obj is None or obj["timestamp"] is None: | |||||
return None | |||||
return datetime.fromtimestamp( | |||||
obj["timestamp"]["seconds"] + (obj["timestamp"]["microseconds"] / 1e6) | |||||
) | |||||
olasd: According to python docs datetime.fromtimestamp uses the UNIX localtime() and can overflow.
I… | |||||
def swh_date_to_offset(obj): | |||||
if obj is None: | |||||
return None | |||||
return obj["offset"] | |||||
class ORCExporter(ExporterDispatch): | |||||
""" | |||||
Implementation of an exporter which writes the entire graph dataset as | |||||
ORC files. Useful for large scale processing, notably on cloud instances | |||||
(e.g BigQuery, Amazon Athena, Azure). | |||||
""" | |||||
def __init__(self, config, export_path, **kwargs): | |||||
super().__init__(config) | |||||
self.export_path = pathlib.Path(export_path) | |||||
self.export_path.mkdir(exist_ok=True, parents=True) | |||||
self.writers = {} | |||||
self.exit_stack = contextlib.ExitStack() | |||||
def __enter__(self): | |||||
self.exit_stack.__enter__() | |||||
return self | |||||
def __exit__(self, exc_type, exc_value, traceback): | |||||
self.exit_stack.__exit__(exc_type, exc_value, traceback) | |||||
def get_writer_for(self, table_name: str): | |||||
if table_name not in self.writers: | |||||
object_type_dir = self.export_path / table_name | |||||
object_type_dir.mkdir(exist_ok=True) | |||||
unique_id = str(uuid.uuid4()) | |||||
export_file = object_type_dir / ("graph-{}.orc".format(unique_id)) | |||||
export_obj = self.exit_stack.enter_context(export_file.open("wb")) | |||||
self.writers[table_name] = self.exit_stack.enter_context( | |||||
Writer(export_obj, EXPORT_SCHEMA[table_name]) | |||||
) | |||||
return self.writers[table_name] | |||||
def process_origin(self, origin): | |||||
origin_writer = self.get_writer_for("origin") | |||||
origin_writer.write((origin["url"],)) | |||||
def process_origin_visit(self, visit): | |||||
origin_visit_writer = self.get_writer_for("origin_visit") | |||||
origin_visit_writer.write( | |||||
(visit["origin"], visit["visit"], visit["date"], visit["type"],) | |||||
) | |||||
def process_origin_visit_status(self, visit_status): | |||||
origin_visit_status_writer = self.get_writer_for("origin_visit_status") | |||||
origin_visit_status_writer.write( | |||||
( | |||||
visit_status["origin"], | |||||
visit_status["visit"], | |||||
visit_status["date"], | |||||
visit_status["status"], | |||||
hash_to_hex_or_none(visit_status["snapshot"]), | |||||
) | |||||
) | |||||
def process_snapshot(self, snapshot): | |||||
if self.config.get("remove_pull_requests"): | |||||
remove_pull_requests(snapshot) | |||||
snapshot_writer = self.get_writer_for("snapshot") | |||||
snapshot_writer.write((hash_to_hex_or_none(snapshot["id"]),)) | |||||
snapshot_branch_writer = self.get_writer_for("snapshot_branch") | |||||
for branch_name, branch in snapshot["branches"].items(): | |||||
if branch is None: | |||||
continue | |||||
snapshot_branch_writer.write( | |||||
( | |||||
hash_to_hex_or_none(snapshot["id"]), | |||||
branch_name, | |||||
hash_to_hex_or_none(branch["target"]), | |||||
branch["target_type"], | |||||
olasdUnsubmitted Not Done Inline ActionsThis is going to look weird for alias branches (but I don't think you can do much better...) Ah, after reading through, you resolve alias branches. Nevermind. (I guess this needs to be documented in the export format!) olasd: This is going to look weird for alias branches (but I don't think you can do much better...)… | |||||
) | |||||
) | |||||
def process_release(self, release): | |||||
release_writer = self.get_writer_for("release") | |||||
release_writer.write( | |||||
( | |||||
hash_to_hex_or_none(release["id"]), | |||||
release["name"], | |||||
release["message"], | |||||
hash_to_hex_or_none(release["target"]), | |||||
release["target_type"], | |||||
release["author"]["fullname"], | |||||
swh_date_to_datetime(release["date"]), | |||||
swh_date_to_offset(release["date"]), | |||||
) | |||||
) | |||||
def process_revision(self, revision): | |||||
release_writer = self.get_writer_for("revision") | |||||
release_writer.write( | |||||
( | |||||
hash_to_hex_or_none(revision["id"]), | |||||
revision["message"], | |||||
revision["author"]["fullname"], | |||||
swh_date_to_datetime(revision["date"]), | |||||
swh_date_to_offset(revision["date"]), | |||||
revision["committer"]["fullname"], | |||||
swh_date_to_datetime(revision["committer_date"]), | |||||
swh_date_to_offset(revision["committer_date"]), | |||||
hash_to_hex_or_none(revision["directory"]), | |||||
) | |||||
) | |||||
def process_directory(self, directory): | |||||
directory_writer = self.get_writer_for("directory") | |||||
directory_writer.write((hash_to_hex_or_none(directory["id"]),)) | |||||
directory_entry_writer = self.get_writer_for("directory_entry") | |||||
for entry in directory["entries"]: | |||||
directory_entry_writer.write( | |||||
( | |||||
hash_to_hex_or_none(directory["id"]), | |||||
entry["name"], | |||||
entry["type"], | |||||
hash_to_hex_or_none(entry["target"]), | |||||
entry["perms"], | |||||
) | |||||
) | |||||
def process_content(self, content): | |||||
content_writer = self.get_writer_for("content") | |||||
content_writer.write( | |||||
( | |||||
hash_to_hex_or_none(content["sha1"]), | |||||
hash_to_hex_or_none(content["sha1_git"]), | |||||
hash_to_hex_or_none(content["sha256"]), | |||||
hash_to_hex_or_none(content["blake2s256"]), | |||||
content["length"], | |||||
content["status"], | |||||
) | |||||
) | |||||
def process_skipped_content(self, skipped_content): | |||||
skipped_content_writer = self.get_writer_for("skipped_content") | |||||
skipped_content_writer.write( | |||||
( | |||||
hash_to_hex_or_none(skipped_content["sha1"]), | |||||
hash_to_hex_or_none(skipped_content["sha1_git"]), | |||||
hash_to_hex_or_none(skipped_content["sha256"]), | |||||
hash_to_hex_or_none(skipped_content["blake2s256"]), | |||||
skipped_content["length"], | |||||
skipped_content["status"], | |||||
skipped_content["reason"], | |||||
) | |||||
) |
According to python docs datetime.fromtimestamp uses the UNIX localtime() and can overflow.
I suggest you use the other idiom from the docs:
This form also has the benefit of not going through a float and potentially losing precision.
It also sets the tzinfo object to something explicit; I think your version would have given you a /localized/ datetime?