diff --git a/swh/dataset/orc_loader.py b/swh/dataset/orc_loader.py new file mode 100644 --- /dev/null +++ b/swh/dataset/orc_loader.py @@ -0,0 +1,256 @@ +from collections import defaultdict +from datetime import datetime, timedelta, timezone +from pathlib import Path +from typing import Dict, Iterator, List, Optional, Tuple, Type, cast + +from pyorc import Reader, StructRepr, TypeKind +from pyorc.converters import ORCConverter + +from swh.dataset.exporters.orc import SWHTimestampConverter +from swh.model import model as swhmodel +from swh.model.hashutil import hash_to_bytes + +SWH_OBJECT_TYPES = { + cls.object_type: cls # type: ignore + for cls in ( + swhmodel.Origin, + swhmodel.OriginVisit, + swhmodel.OriginVisitStatus, + swhmodel.Snapshot, + swhmodel.SnapshotBranch, + swhmodel.Release, + swhmodel.Revision, + swhmodel.Directory, + swhmodel.DirectoryEntry, + swhmodel.Content, + swhmodel.SkippedContent, + swhmodel.MetadataAuthority, + swhmodel.MetadataFetcher, + swhmodel.RawExtrinsicMetadata, + swhmodel.ExtID, + ) +} + + +# basic utility functions + + +def hash_to_bytes_or_none(hash: Optional[str]) -> Optional[bytes]: + return hash_to_bytes(hash) if hash is not None else None + + +def orc_to_swh_date(d, prefix): + timestamp = d.pop(f"{prefix}") + offset_bytes = d.pop(f"{prefix}_raw_offset_bytes") + if prefix == "committer_date": + d.pop("committer_offset") + else: + d.pop(f"{prefix}_offset") + sec, usec = timestamp if timestamp else (None, None) + + if sec is None: + d[prefix] = None + else: + d[prefix] = { + "timestamp": {"seconds": sec, "microseconds": usec}, + "offset_bytes": offset_bytes, + } + + +def orc_to_datetime(sec, usec): + return datetime.fromtimestamp(sec, timezone.utc) + timedelta(microseconds=usec) + + +# obj_type converters + + +def cvrt_release(d, relations=None): + d = d.copy() + d["author"] = ( + swhmodel.Person.from_fullname(d["author"]).to_dict() if d["author"] else None + ) + orc_to_swh_date(d, "date") + if "synthetic" not in d: + d["synthetic"] = False + d["id"] = hash_to_bytes(d["id"]) + if d["target"]: + d["target"] = hash_to_bytes_or_none(d["target"]) + if "target_type" not in d: + d["target_type"] = swhmodel.TargetType.REVISION + return d + + +def cvrt_revision(d, relations=None): + parents = relations["revision_history"] + headers = relations["revision_extra_headers"] + d = d.copy() + d["author"] = ( + swhmodel.Person.from_fullname(d["author"]).to_dict() if d["author"] else None + ) + d["committer"] = ( + swhmodel.Person.from_fullname(d["committer"]).to_dict() + if d["committer"] + else None + ) + orc_to_swh_date(d, "date") + orc_to_swh_date(d, "committer_date") + d["id"] = hash_to_bytes(d["id"]) + d["directory"] = hash_to_bytes_or_none(d["directory"]) + d["parents"] = [ + hash_to_bytes_or_none(p["parent_id"]) + for p in sorted(parents[d["id"]], key=lambda x: x["parent_rank"]) + ] + if "synthetic" not in d: + d["synthetic"] = False + d["extra_headers"] = ( + [(h["key"], h["value"]) for h in headers[d["id"]]] if headers[d["id"]] else () + ) + return d + + +def cvrt_origin_visit_status(d, relations=None): + d = d.copy() + d["snapshot"] = hash_to_bytes_or_none(d["snapshot"]) + d["date"] = orc_to_datetime(*d["date"]) + return d + + +def cvrt_origin_visit(d, relations=None): + d = d.copy() + d["date"] = orc_to_datetime(*d["date"]) + return d + + +def cvrt_snapshot(d, relations=None): + d = d.copy() + d["id"] = hash_to_bytes(d["id"]) + branches = [ + ( + br["name"], + { + "target": hash_to_bytes_or_none(br["target"]), + "target_type": br["target_type"], + }, + ) + for br in relations["snapshot_branch"][d["id"]] + ] + d["branches"] = dict(branches) + return d + + +def cvrt_snapshot_branch(d): + return ( + d["name"], + {"target": hash_to_bytes_or_none(d["target"]), "target_type": d["target_type"]}, + ) + + +def cvrt_directory(d, relations=None): + d = d.copy() + d_id = hash_to_bytes(d.pop("id")) + entries = [ + { + "name": entry["name"], + "type": entry["type"], + "target": hash_to_bytes_or_none(entry["target"]), + "perms": entry["perms"], + } + for entry in relations["directory_entry"][d_id] + ] + d["entries"] = entries + return d + + +def cvrt_content(d, relations=None): + d = d.copy() + d.update( + {k: hash_to_bytes(d[k]) for k in ("sha1", "sha256", "sha1_git", "blake2s256")} + ) + return d + + +LOADERS = { + "origin_visit": cvrt_origin_visit, + "origin_visit_status": cvrt_origin_visit_status, + "release": cvrt_release, + "revision": cvrt_revision, + "snapshot": cvrt_snapshot, + "directory": cvrt_directory, + "content": cvrt_content, + "skipped_content": cvrt_content, +} + +RELATION_TYPES = { + "snapshot": ("snapshot_branch",), + "revision": ("revision_history", "revision_extra_headers",), + "directory": ("directory_entry",), +} + + +# ORC loader functions +def _load_related_ORC(orc_file: Path) -> Tuple[str, Dict[str, List]]: + with orc_file.open("rb") as fileobj: + reader = Reader(fileobj, struct_repr=StructRepr.DICT) + relation_obj_type = reader.user_metadata.get("swh_object_type", b"").decode() + if relation_obj_type == "snapshot_branch": + id_col = "snapshot_id" + elif relation_obj_type == "directory_entry": + id_col = "directory_id" + else: + id_col = "id" + # XXX this loads the whole relation file content in memory... + relations = defaultdict(list) + convert = LOADERS.get(relation_obj_type, lambda x: x) + for relation in (cast(Dict, x) for x in reader): + relations[hash_to_bytes(relation[id_col])].append(convert(relation)) + return relation_obj_type, relations + + +def load_ORC(orc_dir: Path, obj_type: str, uuid: str) -> Iterator[swhmodel.ModelType]: + """Load a set of ORC files and generates swh.model objects + + From the ORC dataset directory orc_dir, load the ORC file named + + {orc_dir}/{obj_type}/{obj_type}-{uuid}.orc + + and generates swh data model objects (of type obj_type) that have been + serialized in the ORC dataset. This may need to load related ORC dataset + files (for example loading a revision dataset file also requires the + related revision_history dataset). + + Related ORC files are expected to be found using the same file name schema. + For example, the revision_history dataset file is expected to be found at: + + {orc_dir}/revision_history/revision_history-{uuid}.orc + + Thus using the same uuid as the main dataset file. + """ + all_relations = {} + + orc_file = orc_dir / obj_type / f"{obj_type}-{uuid}.orc" + with orc_file.open("rb") as fileobj: + reader = Reader( + fileobj, + struct_repr=StructRepr.DICT, + converters={ + TypeKind.TIMESTAMP: cast(Type[ORCConverter], SWHTimestampConverter) + }, + ) + obj_type = reader.user_metadata.get("swh_object_type", b"").decode() + if obj_type in RELATION_TYPES: + for relation_type in RELATION_TYPES[obj_type]: + orc_relation_file = ( + orc_dir / relation_type / f"{relation_type}-{uuid}.orc" + ) + assert orc_relation_file.is_file() + relation_obj_type, relations = _load_related_ORC(orc_relation_file) + assert relation_obj_type == relation_type + all_relations[relation_type] = relations + + convert = LOADERS.get(obj_type, lambda x, y: x) + swhcls = cast(Type[swhmodel.ModelType], SWH_OBJECT_TYPES[obj_type]) + for objdict in reader: + obj = cast( + swhmodel.ModelType, swhcls.from_dict(convert(objdict, all_relations)) + ) + yield obj diff --git a/swh/dataset/test/__init__.py b/swh/dataset/test/__init__.py new file mode 100644 diff --git a/swh/dataset/test/test_orc_load.py b/swh/dataset/test/test_orc_load.py new file mode 100644 --- /dev/null +++ b/swh/dataset/test/test_orc_load.py @@ -0,0 +1,26 @@ +import re + +import attrs +import pytest + +from swh.dataset.orc_loader import load_ORC +from swh.dataset.relational import MAIN_TABLES +from swh.model.tests.swh_model_data import TEST_OBJECTS + +from .test_orc import orc_export + + +@pytest.mark.parametrize("obj_type", MAIN_TABLES.keys()) +def test_load_origins(obj_type): + config = {"orc": {"group_tables": True}} + objects = TEST_OBJECTS[obj_type] + if obj_type == "content": + objects = [attrs.evolve(obj, data=None, ctime=None) for obj in objects] + with orc_export({obj_type: TEST_OBJECTS[obj_type]}, config=config) as export_dir: + for orc_file in (export_dir / obj_type).iterdir(): + m = re.match(r"(?P[a-z_]+)-(?P[0-9a-f-]+).orc", orc_file.name) + if m: + assert m.group("type") == obj_type + uuid = m.group("uuid") + for obj in load_ORC(export_dir, obj_type, uuid): + assert obj in objects