diff --git a/requirements-swh.txt b/requirements-swh.txt --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -2,4 +2,5 @@ swh.core[http] >= 2 swh.journal >= 0.9 swh.model >= 4.3 +swh.objstorage swh.storage diff --git a/swh/dataset/exporters/orc.py b/swh/dataset/exporters/orc.py --- a/swh/dataset/exporters/orc.py +++ b/swh/dataset/exporters/orc.py @@ -29,6 +29,8 @@ from swh.dataset.utils import remove_pull_requests from swh.model.hashutil import hash_to_hex from swh.model.model import TimestampWithTimezone +from swh.objstorage.factory import get_objstorage +from swh.objstorage.objstorage import ID_HASH_ALGO, ObjNotFoundError ORC_TYPE_MAP = { "string": String, @@ -116,8 +118,13 @@ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) - config = self.config.get('orc', {}) + config = self.config.get("orc", {}) self.max_rows = config.get("max_rows", {}) + self.with_data = config.get("with_data", False) + self.objstorage = None + if self.with_data: + assert "objstorage" in config + self.objstorage = get_objstorage(**config["objstorage"]) self._reset() def _reset(self): @@ -331,6 +338,14 @@ def process_content(self, content): content_writer = self.get_writer_for("content") + data = None + if self.with_data: + obj_id = content[ID_HASH_ALGO] + try: + data = self.objstorage.get(obj_id) + except ObjNotFoundError: + logger.warning(f"Missing object {hash_to_hex(obj_id)}") + content_writer.write( ( hash_to_hex_or_none(content["sha1"]), @@ -339,6 +354,7 @@ hash_to_hex_or_none(content["blake2s256"]), content["length"], content["status"], + data, ) ) diff --git a/swh/dataset/relational.py b/swh/dataset/relational.py --- a/swh/dataset/relational.py +++ b/swh/dataset/relational.py @@ -86,6 +86,7 @@ ("blake2s256", "string"), ("length", "bigint"), ("status", "string"), + ("data", "binary") ], "skipped_content": [ ("sha1", "string"), diff --git a/swh/dataset/test/test_orc.py b/swh/dataset/test/test_orc.py --- a/swh/dataset/test/test_orc.py +++ b/swh/dataset/test/test_orc.py @@ -7,14 +7,9 @@ import pyorc import pytest -from swh.dataset.exporters.orc import ( - ORCExporter, - SWHTimestampConverter, - datetime_to_tuple, - hash_to_hex_or_none, - swh_date_to_tuple, -) +from swh.dataset.exporters import orc from swh.model.tests.swh_model_data import TEST_OBJECTS +from swh.objstorage.factory import get_objstorage @contextmanager @@ -32,7 +27,7 @@ with orc_tmpdir(tmpdir) as tmpdir: if config is None: config = {} - with ORCExporter(config, tmpdir) as exporter: + with orc.ORCExporter(config, tmpdir) as exporter: for object_type, objects in messages.items(): for obj in objects: exporter.process_object(object_type, obj.to_dict()) @@ -47,7 +42,7 @@ with orc_file.open("rb") as orc_obj: reader = pyorc.Reader( orc_obj, - converters={pyorc.TypeKind.TIMESTAMP: SWHTimestampConverter}, + converters={pyorc.TypeKind.TIMESTAMP: orc.SWHTimestampConverter}, ) obj_type = reader.user_metadata["swh_object_type"].decode() res[obj_type].extend(reader) @@ -70,9 +65,12 @@ obj_type = "origin_visit" output = exporter({obj_type: TEST_OBJECTS[obj_type]}) for obj in TEST_OBJECTS[obj_type]: - assert (obj.origin, obj.visit, datetime_to_tuple(obj.date), obj.type) in output[ - obj_type - ] + assert ( + obj.origin, + obj.visit, + orc.datetime_to_tuple(obj.date), + obj.type, + ) in output[obj_type] def test_export_origin_visit_status(): @@ -82,9 +80,9 @@ assert ( obj.origin, obj.visit, - datetime_to_tuple(obj.date), + orc.datetime_to_tuple(obj.date), obj.status, - hash_to_hex_or_none(obj.snapshot), + orc.hash_to_hex_or_none(obj.snapshot), obj.type, ) in output[obj_type] @@ -93,14 +91,14 @@ obj_type = "snapshot" output = exporter({obj_type: TEST_OBJECTS[obj_type]}) for obj in TEST_OBJECTS[obj_type]: - assert (hash_to_hex_or_none(obj.id),) in output["snapshot"] + assert (orc.hash_to_hex_or_none(obj.id),) in output["snapshot"] for branch_name, branch in obj.branches.items(): if branch is None: continue assert ( - hash_to_hex_or_none(obj.id), + orc.hash_to_hex_or_none(obj.id), branch_name, - hash_to_hex_or_none(branch.target), + orc.hash_to_hex_or_none(branch.target), str(branch.target_type.value), ) in output["snapshot_branch"] @@ -110,13 +108,15 @@ output = exporter({obj_type: TEST_OBJECTS[obj_type]}) for obj in TEST_OBJECTS[obj_type]: assert ( - hash_to_hex_or_none(obj.id), + orc.hash_to_hex_or_none(obj.id), obj.name, obj.message, - hash_to_hex_or_none(obj.target), + orc.hash_to_hex_or_none(obj.target), obj.target_type.value, obj.author.fullname if obj.author else None, - *swh_date_to_tuple(obj.date.to_dict() if obj.date is not None else None), + *orc.swh_date_to_tuple( + obj.date.to_dict() if obj.date is not None else None + ), obj.raw_manifest, ) in output[obj_type] @@ -126,22 +126,24 @@ output = exporter({obj_type: TEST_OBJECTS[obj_type]}) for obj in TEST_OBJECTS[obj_type]: assert ( - hash_to_hex_or_none(obj.id), + orc.hash_to_hex_or_none(obj.id), obj.message, obj.author.fullname, - *swh_date_to_tuple(obj.date.to_dict() if obj.date is not None else None), + *orc.swh_date_to_tuple( + obj.date.to_dict() if obj.date is not None else None + ), obj.committer.fullname, - *swh_date_to_tuple( + *orc.swh_date_to_tuple( obj.committer_date.to_dict() if obj.committer_date is not None else None ), - hash_to_hex_or_none(obj.directory), + orc.hash_to_hex_or_none(obj.directory), obj.type.value, obj.raw_manifest, ) in output["revision"] for i, parent in enumerate(obj.parents): assert ( - hash_to_hex_or_none(obj.id), - hash_to_hex_or_none(parent), + orc.hash_to_hex_or_none(obj.id), + orc.hash_to_hex_or_none(parent), i, ) in output["revision_history"] @@ -150,13 +152,15 @@ obj_type = "directory" output = exporter({obj_type: TEST_OBJECTS[obj_type]}) for obj in TEST_OBJECTS[obj_type]: - assert (hash_to_hex_or_none(obj.id), obj.raw_manifest) in output["directory"] + assert (orc.hash_to_hex_or_none(obj.id), obj.raw_manifest) in output[ + "directory" + ] for entry in obj.entries: assert ( - hash_to_hex_or_none(obj.id), + orc.hash_to_hex_or_none(obj.id), entry.name, entry.type, - hash_to_hex_or_none(entry.target), + orc.hash_to_hex_or_none(entry.target), entry.perms, ) in output["directory_entry"] @@ -166,12 +170,13 @@ output = exporter({obj_type: TEST_OBJECTS[obj_type]}) for obj in TEST_OBJECTS[obj_type]: assert ( - hash_to_hex_or_none(obj.sha1), - hash_to_hex_or_none(obj.sha1_git), - hash_to_hex_or_none(obj.sha256), - hash_to_hex_or_none(obj.blake2s256), + orc.hash_to_hex_or_none(obj.sha1), + orc.hash_to_hex_or_none(obj.sha1_git), + orc.hash_to_hex_or_none(obj.sha256), + orc.hash_to_hex_or_none(obj.blake2s256), obj.length, obj.status, + None, ) in output[obj_type] @@ -180,10 +185,10 @@ output = exporter({obj_type: TEST_OBJECTS[obj_type]}) for obj in TEST_OBJECTS[obj_type]: assert ( - hash_to_hex_or_none(obj.sha1), - hash_to_hex_or_none(obj.sha1_git), - hash_to_hex_or_none(obj.sha256), - hash_to_hex_or_none(obj.blake2s256), + orc.hash_to_hex_or_none(obj.sha1), + orc.hash_to_hex_or_none(obj.sha1_git), + orc.hash_to_hex_or_none(obj.sha256), + orc.hash_to_hex_or_none(obj.blake2s256), obj.length, obj.status, obj.reason, @@ -192,13 +197,13 @@ def test_date_to_tuple(): ts = {"seconds": 123456, "microseconds": 1515} - assert swh_date_to_tuple({"timestamp": ts, "offset_bytes": b"+0100"}) == ( + assert orc.swh_date_to_tuple({"timestamp": ts, "offset_bytes": b"+0100"}) == ( (123456, 1515), 60, b"+0100", ) - assert swh_date_to_tuple( + assert orc.swh_date_to_tuple( { "timestamp": ts, "offset": 120, @@ -207,15 +212,13 @@ } ) == ((123456, 1515), 60, b"+0100") - assert swh_date_to_tuple( + assert orc.swh_date_to_tuple( {"timestamp": ts, "offset": 120, "negative_utc": False,} ) == ((123456, 1515), 120, b"+0200") - assert swh_date_to_tuple({"timestamp": ts, "offset": 0, "negative_utc": True,}) == ( - (123456, 1515), - 0, - b"-0000", - ) + assert orc.swh_date_to_tuple( + {"timestamp": ts, "offset": 0, "negative_utc": True,} + ) == ((123456, 1515), 0, b"-0000",) # mapping of related tables for each main table (if any) @@ -264,7 +267,8 @@ for orc_file in orcfiles: with orc_file.open("rb") as orc_obj: reader = pyorc.Reader( - orc_obj, converters={pyorc.TypeKind.TIMESTAMP: SWHTimestampConverter}, + orc_obj, + converters={pyorc.TypeKind.TIMESTAMP: orc.SWHTimestampConverter}, ) uuid = reader.user_metadata["swh_uuid"].decode() assert orc_file.basename == f"{obj_type}-{uuid}.orc" @@ -277,10 +281,38 @@ with orc_file.open("rb") as orc_obj: reader = pyorc.Reader( orc_obj, - converters={pyorc.TypeKind.TIMESTAMP: SWHTimestampConverter}, + converters={pyorc.TypeKind.TIMESTAMP: orc.SWHTimestampConverter}, ) assert reader.user_metadata["swh_uuid"].decode() == uuid rows = list(reader) # check branches in this file only concern current snapshot (obj_id) for row in rows: assert row[0] in obj_ids + + +def test_export_content_with_data(monkeypatch, tmpdir): + obj_type = "content" + objstorage = get_objstorage("memory") + for content in TEST_OBJECTS[obj_type]: + objstorage.add(content.data) + + def get_objstorage_mock(**kw): + if kw.get("cls") == "mock": + return objstorage + + monkeypatch.setattr(orc, "get_objstorage", get_objstorage_mock) + config = { + "orc": {"with_data": True, "objstorage": {"cls": "mock"},}, + } + + output = exporter({obj_type: TEST_OBJECTS[obj_type]}, config=config, tmpdir=tmpdir) + for obj in TEST_OBJECTS[obj_type]: + assert ( + orc.hash_to_hex_or_none(obj.sha1), + orc.hash_to_hex_or_none(obj.sha1_git), + orc.hash_to_hex_or_none(obj.sha256), + orc.hash_to_hex_or_none(obj.blake2s256), + obj.length, + obj.status, + obj.data, + ) in output[obj_type]