diff --git a/swh/dataset/exporters/orc.py b/swh/dataset/exporters/orc.py --- a/swh/dataset/exporters/orc.py +++ b/swh/dataset/exporters/orc.py @@ -29,6 +29,8 @@ from swh.dataset.utils import remove_pull_requests from swh.model.hashutil import hash_to_hex from swh.model.model import TimestampWithTimezone +from swh.objstorage.factory import get_objstorage +from swh.objstorage.objstorage import ID_HASH_ALGO ORC_TYPE_MAP = { "string": String, @@ -118,6 +120,11 @@ super().__init__(*args, **kwargs) config = self.config.get('orc', {}) self.max_rows = config.get("max_rows", {}) + self.with_data = config.get("with_data", False) + self.objstorage = None + if self.with_data: + assert "objstorage" in config + self.objstorage = get_objstorage(**config["objstorage"]) self._reset() def _reset(self): @@ -331,6 +338,16 @@ def process_content(self, content): content_writer = self.get_writer_for("content") + data = None + if self.with_data: + obj_id = content[ID_HASH_ALGO] + try: + data = self.objstorage.get(obj_id) + except ObjNotFoundError: + logger.warning(f"Missing object {uid_str}") + #if self.reporter is not None: + # self.reporter("missing_content", uid_str) + content_writer.write( ( hash_to_hex_or_none(content["sha1"]), @@ -339,6 +356,7 @@ hash_to_hex_or_none(content["blake2s256"]), content["length"], content["status"], + data, ) ) diff --git a/swh/dataset/relational.py b/swh/dataset/relational.py --- a/swh/dataset/relational.py +++ b/swh/dataset/relational.py @@ -86,6 +86,7 @@ ("blake2s256", "string"), ("length", "bigint"), ("status", "string"), + ("data", "binary") ], "skipped_content": [ ("sha1", "string"), diff --git a/swh/dataset/test/test_orc.py b/swh/dataset/test/test_orc.py --- a/swh/dataset/test/test_orc.py +++ b/swh/dataset/test/test_orc.py @@ -7,14 +7,9 @@ import pyorc import pytest -from swh.dataset.exporters.orc import ( - ORCExporter, - SWHTimestampConverter, - datetime_to_tuple, - hash_to_hex_or_none, - swh_date_to_tuple, -) +from swh.dataset.exporters import orc from swh.model.tests.swh_model_data import TEST_OBJECTS +from swh.objstorage.factory import get_objstorage @contextmanager @@ -32,7 +27,7 @@ with orc_tmpdir(tmpdir) as tmpdir: if config is None: config = {} - with ORCExporter(config, tmpdir) as exporter: + with orc.ORCExporter(config, tmpdir) as exporter: for object_type, objects in messages.items(): for obj in objects: exporter.process_object(object_type, obj.to_dict()) @@ -47,7 +42,7 @@ with orc_file.open("rb") as orc_obj: reader = pyorc.Reader( orc_obj, - converters={pyorc.TypeKind.TIMESTAMP: SWHTimestampConverter}, + converters={pyorc.TypeKind.TIMESTAMP: orc.SWHTimestampConverter}, ) obj_type = reader.user_metadata["swh_object_type"].decode() res[obj_type].extend(reader) @@ -70,7 +65,7 @@ obj_type = "origin_visit" output = exporter({obj_type: TEST_OBJECTS[obj_type]}) for obj in TEST_OBJECTS[obj_type]: - assert (obj.origin, obj.visit, datetime_to_tuple(obj.date), obj.type) in output[ + assert (obj.origin, obj.visit, orc.datetime_to_tuple(obj.date), obj.type) in output[ obj_type ] @@ -82,9 +77,9 @@ assert ( obj.origin, obj.visit, - datetime_to_tuple(obj.date), + orc.datetime_to_tuple(obj.date), obj.status, - hash_to_hex_or_none(obj.snapshot), + orc.hash_to_hex_or_none(obj.snapshot), obj.type, ) in output[obj_type] @@ -93,14 +88,14 @@ obj_type = "snapshot" output = exporter({obj_type: TEST_OBJECTS[obj_type]}) for obj in TEST_OBJECTS[obj_type]: - assert (hash_to_hex_or_none(obj.id),) in output["snapshot"] + assert (orc.hash_to_hex_or_none(obj.id),) in output["snapshot"] for branch_name, branch in obj.branches.items(): if branch is None: continue assert ( - hash_to_hex_or_none(obj.id), + orc.hash_to_hex_or_none(obj.id), branch_name, - hash_to_hex_or_none(branch.target), + orc.hash_to_hex_or_none(branch.target), str(branch.target_type.value), ) in output["snapshot_branch"] @@ -110,13 +105,13 @@ output = exporter({obj_type: TEST_OBJECTS[obj_type]}) for obj in TEST_OBJECTS[obj_type]: assert ( - hash_to_hex_or_none(obj.id), + orc.hash_to_hex_or_none(obj.id), obj.name, obj.message, - hash_to_hex_or_none(obj.target), + orc.hash_to_hex_or_none(obj.target), obj.target_type.value, obj.author.fullname if obj.author else None, - *swh_date_to_tuple(obj.date.to_dict() if obj.date is not None else None), + *orc.swh_date_to_tuple(obj.date.to_dict() if obj.date is not None else None), obj.raw_manifest, ) in output[obj_type] @@ -126,22 +121,22 @@ output = exporter({obj_type: TEST_OBJECTS[obj_type]}) for obj in TEST_OBJECTS[obj_type]: assert ( - hash_to_hex_or_none(obj.id), + orc.hash_to_hex_or_none(obj.id), obj.message, obj.author.fullname, - *swh_date_to_tuple(obj.date.to_dict() if obj.date is not None else None), + *orc.swh_date_to_tuple(obj.date.to_dict() if obj.date is not None else None), obj.committer.fullname, - *swh_date_to_tuple( + *orc.swh_date_to_tuple( obj.committer_date.to_dict() if obj.committer_date is not None else None ), - hash_to_hex_or_none(obj.directory), + orc.hash_to_hex_or_none(obj.directory), obj.type.value, obj.raw_manifest, ) in output["revision"] for i, parent in enumerate(obj.parents): assert ( - hash_to_hex_or_none(obj.id), - hash_to_hex_or_none(parent), + orc.hash_to_hex_or_none(obj.id), + orc.hash_to_hex_or_none(parent), i, ) in output["revision_history"] @@ -150,13 +145,13 @@ obj_type = "directory" output = exporter({obj_type: TEST_OBJECTS[obj_type]}) for obj in TEST_OBJECTS[obj_type]: - assert (hash_to_hex_or_none(obj.id), obj.raw_manifest) in output["directory"] + assert (orc.hash_to_hex_or_none(obj.id), obj.raw_manifest) in output["directory"] for entry in obj.entries: assert ( - hash_to_hex_or_none(obj.id), + orc.hash_to_hex_or_none(obj.id), entry.name, entry.type, - hash_to_hex_or_none(entry.target), + orc.hash_to_hex_or_none(entry.target), entry.perms, ) in output["directory_entry"] @@ -166,12 +161,13 @@ output = exporter({obj_type: TEST_OBJECTS[obj_type]}) for obj in TEST_OBJECTS[obj_type]: assert ( - hash_to_hex_or_none(obj.sha1), - hash_to_hex_or_none(obj.sha1_git), - hash_to_hex_or_none(obj.sha256), - hash_to_hex_or_none(obj.blake2s256), + orc.hash_to_hex_or_none(obj.sha1), + orc.hash_to_hex_or_none(obj.sha1_git), + orc.hash_to_hex_or_none(obj.sha256), + orc.hash_to_hex_or_none(obj.blake2s256), obj.length, obj.status, + None, ) in output[obj_type] @@ -180,10 +176,10 @@ output = exporter({obj_type: TEST_OBJECTS[obj_type]}) for obj in TEST_OBJECTS[obj_type]: assert ( - hash_to_hex_or_none(obj.sha1), - hash_to_hex_or_none(obj.sha1_git), - hash_to_hex_or_none(obj.sha256), - hash_to_hex_or_none(obj.blake2s256), + orc.hash_to_hex_or_none(obj.sha1), + orc.hash_to_hex_or_none(obj.sha1_git), + orc.hash_to_hex_or_none(obj.sha256), + orc.hash_to_hex_or_none(obj.blake2s256), obj.length, obj.status, obj.reason, @@ -192,13 +188,13 @@ def test_date_to_tuple(): ts = {"seconds": 123456, "microseconds": 1515} - assert swh_date_to_tuple({"timestamp": ts, "offset_bytes": b"+0100"}) == ( + assert orc.swh_date_to_tuple({"timestamp": ts, "offset_bytes": b"+0100"}) == ( (123456, 1515), 60, b"+0100", ) - assert swh_date_to_tuple( + assert orc.swh_date_to_tuple( { "timestamp": ts, "offset": 120, @@ -207,11 +203,11 @@ } ) == ((123456, 1515), 60, b"+0100") - assert swh_date_to_tuple( + assert orc.swh_date_to_tuple( {"timestamp": ts, "offset": 120, "negative_utc": False,} ) == ((123456, 1515), 120, b"+0200") - assert swh_date_to_tuple({"timestamp": ts, "offset": 0, "negative_utc": True,}) == ( + assert orc.swh_date_to_tuple({"timestamp": ts, "offset": 0, "negative_utc": True,}) == ( (123456, 1515), 0, b"-0000", @@ -264,7 +260,7 @@ for orc_file in orcfiles: with orc_file.open("rb") as orc_obj: reader = pyorc.Reader( - orc_obj, converters={pyorc.TypeKind.TIMESTAMP: SWHTimestampConverter}, + orc_obj, converters={pyorc.TypeKind.TIMESTAMP: orc.SWHTimestampConverter}, ) uuid = reader.user_metadata["swh_uuid"].decode() assert orc_file.basename == f"{obj_type}-{uuid}.orc" @@ -277,10 +273,39 @@ with orc_file.open("rb") as orc_obj: reader = pyorc.Reader( orc_obj, - converters={pyorc.TypeKind.TIMESTAMP: SWHTimestampConverter}, + converters={pyorc.TypeKind.TIMESTAMP: orc.SWHTimestampConverter}, ) assert reader.user_metadata["swh_uuid"].decode() == uuid rows = list(reader) # check branches in this file only concern current snapshot (obj_id) for row in rows: assert row[0] in obj_ids + +def test_export_content_with_data(monkeypatch, tmpdir): + obj_type = "content" + objstorage = get_objstorage("memory") + for content in TEST_OBJECTS[obj_type]: + objstorage.add(content.data) + + def get_objstorage_mock(**kw): + if kw.get("cls") == "mock": + return objstorage + + monkeypatch.setattr(orc, "get_objstorage", get_objstorage_mock) + config = {"orc": {"with_data": True, + "objstorage": {"cls": "mock"}, + }, + } + + output = exporter({obj_type: TEST_OBJECTS[obj_type]}, + config=config, tmpdir=tmpdir) + for obj in TEST_OBJECTS[obj_type]: + assert ( + orc.hash_to_hex_or_none(obj.sha1), + orc.hash_to_hex_or_none(obj.sha1_git), + orc.hash_to_hex_or_none(obj.sha256), + orc.hash_to_hex_or_none(obj.blake2s256), + obj.length, + obj.status, + obj.data, + ) in output[obj_type]