Page MenuHomeSoftware Heritage

D7520.diff
No OneTemporary

D7520.diff

diff --git a/swh/dataset/orc_loader.py b/swh/dataset/orc_loader.py
new file mode 100644
--- /dev/null
+++ b/swh/dataset/orc_loader.py
@@ -0,0 +1,256 @@
+from collections import defaultdict
+from datetime import datetime, timedelta, timezone
+from pathlib import Path
+from typing import Dict, Iterator, List, Optional, Tuple, Type, cast
+
+from pyorc import Reader, StructRepr, TypeKind
+from pyorc.converters import ORCConverter
+
+from swh.dataset.exporters.orc import SWHTimestampConverter
+from swh.model import model as swhmodel
+from swh.model.hashutil import hash_to_bytes
+
+SWH_OBJECT_TYPES = {
+ cls.object_type: cls # type: ignore
+ for cls in (
+ swhmodel.Origin,
+ swhmodel.OriginVisit,
+ swhmodel.OriginVisitStatus,
+ swhmodel.Snapshot,
+ swhmodel.SnapshotBranch,
+ swhmodel.Release,
+ swhmodel.Revision,
+ swhmodel.Directory,
+ swhmodel.DirectoryEntry,
+ swhmodel.Content,
+ swhmodel.SkippedContent,
+ swhmodel.MetadataAuthority,
+ swhmodel.MetadataFetcher,
+ swhmodel.RawExtrinsicMetadata,
+ swhmodel.ExtID,
+ )
+}
+
+
+# basic utility functions
+
+
+def hash_to_bytes_or_none(hash: Optional[str]) -> Optional[bytes]:
+ return hash_to_bytes(hash) if hash is not None else None
+
+
+def orc_to_swh_date(d, prefix):
+ timestamp = d.pop(f"{prefix}")
+ offset_bytes = d.pop(f"{prefix}_raw_offset_bytes")
+ if prefix == "committer_date":
+ d.pop("committer_offset")
+ else:
+ d.pop(f"{prefix}_offset")
+ sec, usec = timestamp if timestamp else (None, None)
+
+ if sec is None:
+ d[prefix] = None
+ else:
+ d[prefix] = {
+ "timestamp": {"seconds": sec, "microseconds": usec},
+ "offset_bytes": offset_bytes,
+ }
+
+
+def orc_to_datetime(sec, usec):
+ return datetime.fromtimestamp(sec, timezone.utc) + timedelta(microseconds=usec)
+
+
+# obj_type converters
+
+
+def cvrt_release(d, relations=None):
+ d = d.copy()
+ d["author"] = (
+ swhmodel.Person.from_fullname(d["author"]).to_dict() if d["author"] else None
+ )
+ orc_to_swh_date(d, "date")
+ if "synthetic" not in d:
+ d["synthetic"] = False
+ d["id"] = hash_to_bytes(d["id"])
+ if d["target"]:
+ d["target"] = hash_to_bytes_or_none(d["target"])
+ if "target_type" not in d:
+ d["target_type"] = swhmodel.TargetType.REVISION
+ return d
+
+
+def cvrt_revision(d, relations=None):
+ parents = relations["revision_history"]
+ headers = relations["revision_extra_headers"]
+ d = d.copy()
+ d["author"] = (
+ swhmodel.Person.from_fullname(d["author"]).to_dict() if d["author"] else None
+ )
+ d["committer"] = (
+ swhmodel.Person.from_fullname(d["committer"]).to_dict()
+ if d["committer"]
+ else None
+ )
+ orc_to_swh_date(d, "date")
+ orc_to_swh_date(d, "committer_date")
+ d["id"] = hash_to_bytes(d["id"])
+ d["directory"] = hash_to_bytes_or_none(d["directory"])
+ d["parents"] = [
+ hash_to_bytes_or_none(p["parent_id"])
+ for p in sorted(parents[d["id"]], key=lambda x: x["parent_rank"])
+ ]
+ if "synthetic" not in d:
+ d["synthetic"] = False
+ d["extra_headers"] = (
+ [(h["key"], h["value"]) for h in headers[d["id"]]] if headers[d["id"]] else ()
+ )
+ return d
+
+
+def cvrt_origin_visit_status(d, relations=None):
+ d = d.copy()
+ d["snapshot"] = hash_to_bytes_or_none(d["snapshot"])
+ d["date"] = orc_to_datetime(*d["date"])
+ return d
+
+
+def cvrt_origin_visit(d, relations=None):
+ d = d.copy()
+ d["date"] = orc_to_datetime(*d["date"])
+ return d
+
+
+def cvrt_snapshot(d, relations=None):
+ d = d.copy()
+ d["id"] = hash_to_bytes(d["id"])
+ branches = [
+ (
+ br["name"],
+ {
+ "target": hash_to_bytes_or_none(br["target"]),
+ "target_type": br["target_type"],
+ },
+ )
+ for br in relations["snapshot_branch"][d["id"]]
+ ]
+ d["branches"] = dict(branches)
+ return d
+
+
+def cvrt_snapshot_branch(d):
+ return (
+ d["name"],
+ {"target": hash_to_bytes_or_none(d["target"]), "target_type": d["target_type"]},
+ )
+
+
+def cvrt_directory(d, relations=None):
+ d = d.copy()
+ d_id = hash_to_bytes(d.pop("id"))
+ entries = [
+ {
+ "name": entry["name"],
+ "type": entry["type"],
+ "target": hash_to_bytes_or_none(entry["target"]),
+ "perms": entry["perms"],
+ }
+ for entry in relations["directory_entry"][d_id]
+ ]
+ d["entries"] = entries
+ return d
+
+
+def cvrt_content(d, relations=None):
+ d = d.copy()
+ d.update(
+ {k: hash_to_bytes(d[k]) for k in ("sha1", "sha256", "sha1_git", "blake2s256")}
+ )
+ return d
+
+
+LOADERS = {
+ "origin_visit": cvrt_origin_visit,
+ "origin_visit_status": cvrt_origin_visit_status,
+ "release": cvrt_release,
+ "revision": cvrt_revision,
+ "snapshot": cvrt_snapshot,
+ "directory": cvrt_directory,
+ "content": cvrt_content,
+ "skipped_content": cvrt_content,
+}
+
+RELATION_TYPES = {
+ "snapshot": ("snapshot_branch",),
+ "revision": ("revision_history", "revision_extra_headers",),
+ "directory": ("directory_entry",),
+}
+
+
+# ORC loader functions
+def _load_related_ORC(orc_file: Path) -> Tuple[str, Dict[str, List]]:
+ with orc_file.open("rb") as fileobj:
+ reader = Reader(fileobj, struct_repr=StructRepr.DICT)
+ relation_obj_type = reader.user_metadata.get("swh_object_type", b"").decode()
+ if relation_obj_type == "snapshot_branch":
+ id_col = "snapshot_id"
+ elif relation_obj_type == "directory_entry":
+ id_col = "directory_id"
+ else:
+ id_col = "id"
+ # XXX this loads the whole relation file content in memory...
+ relations = defaultdict(list)
+ convert = LOADERS.get(relation_obj_type, lambda x: x)
+ for relation in (cast(Dict, x) for x in reader):
+ relations[hash_to_bytes(relation[id_col])].append(convert(relation))
+ return relation_obj_type, relations
+
+
+def load_ORC(orc_dir: Path, obj_type: str, uuid: str) -> Iterator[swhmodel.ModelType]:
+ """Load a set of ORC files and generates swh.model objects
+
+ From the ORC dataset directory orc_dir, load the ORC file named
+
+ {orc_dir}/{obj_type}/{obj_type}-{uuid}.orc
+
+ and generates swh data model objects (of type obj_type) that have been
+ serialized in the ORC dataset. This may need to load related ORC dataset
+ files (for example loading a revision dataset file also requires the
+ related revision_history dataset).
+
+ Related ORC files are expected to be found using the same file name schema.
+ For example, the revision_history dataset file is expected to be found at:
+
+ {orc_dir}/revision_history/revision_history-{uuid}.orc
+
+ Thus using the same uuid as the main dataset file.
+ """
+ all_relations = {}
+
+ orc_file = orc_dir / obj_type / f"{obj_type}-{uuid}.orc"
+ with orc_file.open("rb") as fileobj:
+ reader = Reader(
+ fileobj,
+ struct_repr=StructRepr.DICT,
+ converters={
+ TypeKind.TIMESTAMP: cast(Type[ORCConverter], SWHTimestampConverter)
+ },
+ )
+ obj_type = reader.user_metadata.get("swh_object_type", b"").decode()
+ if obj_type in RELATION_TYPES:
+ for relation_type in RELATION_TYPES[obj_type]:
+ orc_relation_file = (
+ orc_dir / relation_type / f"{relation_type}-{uuid}.orc"
+ )
+ assert orc_relation_file.is_file()
+ relation_obj_type, relations = _load_related_ORC(orc_relation_file)
+ assert relation_obj_type == relation_type
+ all_relations[relation_type] = relations
+
+ convert = LOADERS.get(obj_type, lambda x, y: x)
+ swhcls = cast(Type[swhmodel.ModelType], SWH_OBJECT_TYPES[obj_type])
+ for objdict in reader:
+ obj = cast(
+ swhmodel.ModelType, swhcls.from_dict(convert(objdict, all_relations))
+ )
+ yield obj
diff --git a/swh/dataset/test/__init__.py b/swh/dataset/test/__init__.py
new file mode 100644
diff --git a/swh/dataset/test/test_orc_load.py b/swh/dataset/test/test_orc_load.py
new file mode 100644
--- /dev/null
+++ b/swh/dataset/test/test_orc_load.py
@@ -0,0 +1,26 @@
+import re
+
+import attrs
+import pytest
+
+from swh.dataset.orc_loader import load_ORC
+from swh.dataset.relational import MAIN_TABLES
+from swh.model.tests.swh_model_data import TEST_OBJECTS
+
+from .test_orc import orc_export
+
+
+@pytest.mark.parametrize("obj_type", MAIN_TABLES.keys())
+def test_load_origins(obj_type):
+ config = {"orc": {"group_tables": True}}
+ objects = TEST_OBJECTS[obj_type]
+ if obj_type == "content":
+ objects = [attrs.evolve(obj, data=None, ctime=None) for obj in objects]
+ with orc_export({obj_type: TEST_OBJECTS[obj_type]}, config=config) as export_dir:
+ for orc_file in (export_dir / obj_type).iterdir():
+ m = re.match(r"(?P<type>[a-z_]+)-(?P<uuid>[0-9a-f-]+).orc", orc_file.name)
+ if m:
+ assert m.group("type") == obj_type
+ uuid = m.group("uuid")
+ for obj in load_ORC(export_dir, obj_type, uuid):
+ assert obj in objects

File Metadata

Mime Type
text/plain
Expires
Thu, Jan 23, 3:01 AM (1 w, 3 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3224614

Event Timeline