Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F7147917
D7520.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
9 KB
Subscribers
None
D7520.diff
View Options
diff --git a/swh/dataset/orc_loader.py b/swh/dataset/orc_loader.py
new file mode 100644
--- /dev/null
+++ b/swh/dataset/orc_loader.py
@@ -0,0 +1,256 @@
+from collections import defaultdict
+from datetime import datetime, timedelta, timezone
+from pathlib import Path
+from typing import Dict, Iterator, List, Optional, Tuple, Type, cast
+
+from pyorc import Reader, StructRepr, TypeKind
+from pyorc.converters import ORCConverter
+
+from swh.dataset.exporters.orc import SWHTimestampConverter
+from swh.model import model as swhmodel
+from swh.model.hashutil import hash_to_bytes
+
+SWH_OBJECT_TYPES = {
+ cls.object_type: cls # type: ignore
+ for cls in (
+ swhmodel.Origin,
+ swhmodel.OriginVisit,
+ swhmodel.OriginVisitStatus,
+ swhmodel.Snapshot,
+ swhmodel.SnapshotBranch,
+ swhmodel.Release,
+ swhmodel.Revision,
+ swhmodel.Directory,
+ swhmodel.DirectoryEntry,
+ swhmodel.Content,
+ swhmodel.SkippedContent,
+ swhmodel.MetadataAuthority,
+ swhmodel.MetadataFetcher,
+ swhmodel.RawExtrinsicMetadata,
+ swhmodel.ExtID,
+ )
+}
+
+
+# basic utility functions
+
+
+def hash_to_bytes_or_none(hash: Optional[str]) -> Optional[bytes]:
+ return hash_to_bytes(hash) if hash is not None else None
+
+
+def orc_to_swh_date(d, prefix):
+ timestamp = d.pop(f"{prefix}")
+ offset_bytes = d.pop(f"{prefix}_raw_offset_bytes")
+ if prefix == "committer_date":
+ d.pop("committer_offset")
+ else:
+ d.pop(f"{prefix}_offset")
+ sec, usec = timestamp if timestamp else (None, None)
+
+ if sec is None:
+ d[prefix] = None
+ else:
+ d[prefix] = {
+ "timestamp": {"seconds": sec, "microseconds": usec},
+ "offset_bytes": offset_bytes,
+ }
+
+
+def orc_to_datetime(sec, usec):
+ return datetime.fromtimestamp(sec, timezone.utc) + timedelta(microseconds=usec)
+
+
+# obj_type converters
+
+
+def cvrt_release(d, relations=None):
+ d = d.copy()
+ d["author"] = (
+ swhmodel.Person.from_fullname(d["author"]).to_dict() if d["author"] else None
+ )
+ orc_to_swh_date(d, "date")
+ if "synthetic" not in d:
+ d["synthetic"] = False
+ d["id"] = hash_to_bytes(d["id"])
+ if d["target"]:
+ d["target"] = hash_to_bytes_or_none(d["target"])
+ if "target_type" not in d:
+ d["target_type"] = swhmodel.TargetType.REVISION
+ return d
+
+
+def cvrt_revision(d, relations=None):
+ parents = relations["revision_history"]
+ headers = relations["revision_extra_headers"]
+ d = d.copy()
+ d["author"] = (
+ swhmodel.Person.from_fullname(d["author"]).to_dict() if d["author"] else None
+ )
+ d["committer"] = (
+ swhmodel.Person.from_fullname(d["committer"]).to_dict()
+ if d["committer"]
+ else None
+ )
+ orc_to_swh_date(d, "date")
+ orc_to_swh_date(d, "committer_date")
+ d["id"] = hash_to_bytes(d["id"])
+ d["directory"] = hash_to_bytes_or_none(d["directory"])
+ d["parents"] = [
+ hash_to_bytes_or_none(p["parent_id"])
+ for p in sorted(parents[d["id"]], key=lambda x: x["parent_rank"])
+ ]
+ if "synthetic" not in d:
+ d["synthetic"] = False
+ d["extra_headers"] = (
+ [(h["key"], h["value"]) for h in headers[d["id"]]] if headers[d["id"]] else ()
+ )
+ return d
+
+
+def cvrt_origin_visit_status(d, relations=None):
+ d = d.copy()
+ d["snapshot"] = hash_to_bytes_or_none(d["snapshot"])
+ d["date"] = orc_to_datetime(*d["date"])
+ return d
+
+
+def cvrt_origin_visit(d, relations=None):
+ d = d.copy()
+ d["date"] = orc_to_datetime(*d["date"])
+ return d
+
+
+def cvrt_snapshot(d, relations=None):
+ d = d.copy()
+ d["id"] = hash_to_bytes(d["id"])
+ branches = [
+ (
+ br["name"],
+ {
+ "target": hash_to_bytes_or_none(br["target"]),
+ "target_type": br["target_type"],
+ },
+ )
+ for br in relations["snapshot_branch"][d["id"]]
+ ]
+ d["branches"] = dict(branches)
+ return d
+
+
+def cvrt_snapshot_branch(d):
+ return (
+ d["name"],
+ {"target": hash_to_bytes_or_none(d["target"]), "target_type": d["target_type"]},
+ )
+
+
+def cvrt_directory(d, relations=None):
+ d = d.copy()
+ d_id = hash_to_bytes(d.pop("id"))
+ entries = [
+ {
+ "name": entry["name"],
+ "type": entry["type"],
+ "target": hash_to_bytes_or_none(entry["target"]),
+ "perms": entry["perms"],
+ }
+ for entry in relations["directory_entry"][d_id]
+ ]
+ d["entries"] = entries
+ return d
+
+
+def cvrt_content(d, relations=None):
+ d = d.copy()
+ d.update(
+ {k: hash_to_bytes(d[k]) for k in ("sha1", "sha256", "sha1_git", "blake2s256")}
+ )
+ return d
+
+
+LOADERS = {
+ "origin_visit": cvrt_origin_visit,
+ "origin_visit_status": cvrt_origin_visit_status,
+ "release": cvrt_release,
+ "revision": cvrt_revision,
+ "snapshot": cvrt_snapshot,
+ "directory": cvrt_directory,
+ "content": cvrt_content,
+ "skipped_content": cvrt_content,
+}
+
+RELATION_TYPES = {
+ "snapshot": ("snapshot_branch",),
+ "revision": ("revision_history", "revision_extra_headers",),
+ "directory": ("directory_entry",),
+}
+
+
+# ORC loader functions
+def _load_related_ORC(orc_file: Path) -> Tuple[str, Dict[str, List]]:
+ with orc_file.open("rb") as fileobj:
+ reader = Reader(fileobj, struct_repr=StructRepr.DICT)
+ relation_obj_type = reader.user_metadata.get("swh_object_type", b"").decode()
+ if relation_obj_type == "snapshot_branch":
+ id_col = "snapshot_id"
+ elif relation_obj_type == "directory_entry":
+ id_col = "directory_id"
+ else:
+ id_col = "id"
+ # XXX this loads the whole relation file content in memory...
+ relations = defaultdict(list)
+ convert = LOADERS.get(relation_obj_type, lambda x: x)
+ for relation in (cast(Dict, x) for x in reader):
+ relations[hash_to_bytes(relation[id_col])].append(convert(relation))
+ return relation_obj_type, relations
+
+
+def load_ORC(orc_dir: Path, obj_type: str, uuid: str) -> Iterator[swhmodel.ModelType]:
+ """Load a set of ORC files and generates swh.model objects
+
+ From the ORC dataset directory orc_dir, load the ORC file named
+
+ {orc_dir}/{obj_type}/{obj_type}-{uuid}.orc
+
+ and generates swh data model objects (of type obj_type) that have been
+ serialized in the ORC dataset. This may need to load related ORC dataset
+ files (for example loading a revision dataset file also requires the
+ related revision_history dataset).
+
+ Related ORC files are expected to be found using the same file name schema.
+ For example, the revision_history dataset file is expected to be found at:
+
+ {orc_dir}/revision_history/revision_history-{uuid}.orc
+
+ Thus using the same uuid as the main dataset file.
+ """
+ all_relations = {}
+
+ orc_file = orc_dir / obj_type / f"{obj_type}-{uuid}.orc"
+ with orc_file.open("rb") as fileobj:
+ reader = Reader(
+ fileobj,
+ struct_repr=StructRepr.DICT,
+ converters={
+ TypeKind.TIMESTAMP: cast(Type[ORCConverter], SWHTimestampConverter)
+ },
+ )
+ obj_type = reader.user_metadata.get("swh_object_type", b"").decode()
+ if obj_type in RELATION_TYPES:
+ for relation_type in RELATION_TYPES[obj_type]:
+ orc_relation_file = (
+ orc_dir / relation_type / f"{relation_type}-{uuid}.orc"
+ )
+ assert orc_relation_file.is_file()
+ relation_obj_type, relations = _load_related_ORC(orc_relation_file)
+ assert relation_obj_type == relation_type
+ all_relations[relation_type] = relations
+
+ convert = LOADERS.get(obj_type, lambda x, y: x)
+ swhcls = cast(Type[swhmodel.ModelType], SWH_OBJECT_TYPES[obj_type])
+ for objdict in reader:
+ obj = cast(
+ swhmodel.ModelType, swhcls.from_dict(convert(objdict, all_relations))
+ )
+ yield obj
diff --git a/swh/dataset/test/__init__.py b/swh/dataset/test/__init__.py
new file mode 100644
diff --git a/swh/dataset/test/test_orc_load.py b/swh/dataset/test/test_orc_load.py
new file mode 100644
--- /dev/null
+++ b/swh/dataset/test/test_orc_load.py
@@ -0,0 +1,26 @@
+import re
+
+import attrs
+import pytest
+
+from swh.dataset.orc_loader import load_ORC
+from swh.dataset.relational import MAIN_TABLES
+from swh.model.tests.swh_model_data import TEST_OBJECTS
+
+from .test_orc import orc_export
+
+
+@pytest.mark.parametrize("obj_type", MAIN_TABLES.keys())
+def test_load_origins(obj_type):
+ config = {"orc": {"group_tables": True}}
+ objects = TEST_OBJECTS[obj_type]
+ if obj_type == "content":
+ objects = [attrs.evolve(obj, data=None, ctime=None) for obj in objects]
+ with orc_export({obj_type: TEST_OBJECTS[obj_type]}, config=config) as export_dir:
+ for orc_file in (export_dir / obj_type).iterdir():
+ m = re.match(r"(?P<type>[a-z_]+)-(?P<uuid>[0-9a-f-]+).orc", orc_file.name)
+ if m:
+ assert m.group("type") == obj_type
+ uuid = m.group("uuid")
+ for obj in load_ORC(export_dir, obj_type, uuid):
+ assert obj in objects
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Thu, Jan 23, 3:01 AM (1 w, 3 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3224614
Attached To
D7520: Add a ORC file loading function
Event Timeline
Log In to Comment