diff --git a/requirements-swh.txt b/requirements-swh.txt --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -2,4 +2,5 @@ swh.core[http] >= 2 swh.journal >= 0.9 swh.model >= 4.3 +swh.objstorage swh.storage diff --git a/swh/dataset/exporters/orc.py b/swh/dataset/exporters/orc.py --- a/swh/dataset/exporters/orc.py +++ b/swh/dataset/exporters/orc.py @@ -29,6 +29,8 @@ from swh.dataset.utils import remove_pull_requests from swh.model.hashutil import hash_to_hex from swh.model.model import TimestampWithTimezone +from swh.objstorage.factory import get_objstorage +from swh.objstorage.objstorage import ID_HASH_ALGO, ObjNotFoundError ORC_TYPE_MAP = { "string": String, @@ -127,6 +129,11 @@ "for now.", invalid_tables, ) + self.with_data = config.get("with_data", False) + self.objstorage = None + if self.with_data: + assert "objstorage" in config + self.objstorage = get_objstorage(**config["objstorage"]) self._reset() def _reset(self): @@ -330,6 +337,14 @@ def process_content(self, content): content_writer = self.get_writer_for("content") + data = None + if self.with_data: + obj_id = content[ID_HASH_ALGO] + try: + data = self.objstorage.get(obj_id) + except ObjNotFoundError: + logger.warning(f"Missing object {hash_to_hex(obj_id)}") + content_writer.write( ( hash_to_hex_or_none(content["sha1"]), @@ -338,6 +353,7 @@ hash_to_hex_or_none(content["blake2s256"]), content["length"], content["status"], + data, ) ) diff --git a/swh/dataset/relational.py b/swh/dataset/relational.py --- a/swh/dataset/relational.py +++ b/swh/dataset/relational.py @@ -67,6 +67,7 @@ ("blake2s256", "string"), ("length", "bigint"), ("status", "string"), + ("data", "binary") ], "skipped_content": [ ("sha1", "string"), diff --git a/swh/dataset/test/test_orc.py b/swh/dataset/test/test_orc.py --- a/swh/dataset/test/test_orc.py +++ b/swh/dataset/test/test_orc.py @@ -12,15 +12,10 @@ import pyorc import pytest -from swh.dataset.exporters.orc import ( - ORCExporter, - SWHTimestampConverter, - datetime_to_tuple, - hash_to_hex_or_none, - swh_date_to_tuple, -) +from swh.dataset.exporters import orc from swh.dataset.relational import MAIN_TABLES, RELATION_TABLES from swh.model.tests.swh_model_data import TEST_OBJECTS +from swh.objstorage.factory import get_objstorage @contextmanager @@ -38,7 +33,7 @@ with orc_tmpdir(tmpdir) as tmpdir: if config is None: config = {} - with ORCExporter(config, tmpdir) as exporter: + with orc.ORCExporter(config, tmpdir) as exporter: for object_type, objects in messages.items(): for obj in objects: exporter.process_object(object_type, obj.to_dict()) @@ -53,7 +48,7 @@ with orc_file.open("rb") as orc_obj: reader = pyorc.Reader( orc_obj, - converters={pyorc.TypeKind.TIMESTAMP: SWHTimestampConverter}, + converters={pyorc.TypeKind.TIMESTAMP: orc.SWHTimestampConverter}, ) obj_type = reader.user_metadata["swh_object_type"].decode() res[obj_type].extend(reader) @@ -76,9 +71,12 @@ obj_type = "origin_visit" output = exporter({obj_type: TEST_OBJECTS[obj_type]}) for obj in TEST_OBJECTS[obj_type]: - assert (obj.origin, obj.visit, datetime_to_tuple(obj.date), obj.type) in output[ - obj_type - ] + assert ( + obj.origin, + obj.visit, + orc.datetime_to_tuple(obj.date), + obj.type, + ) in output[obj_type] def test_export_origin_visit_status(): @@ -88,9 +86,9 @@ assert ( obj.origin, obj.visit, - datetime_to_tuple(obj.date), + orc.datetime_to_tuple(obj.date), obj.status, - hash_to_hex_or_none(obj.snapshot), + orc.hash_to_hex_or_none(obj.snapshot), obj.type, ) in output[obj_type] @@ -99,14 +97,14 @@ obj_type = "snapshot" output = exporter({obj_type: TEST_OBJECTS[obj_type]}) for obj in TEST_OBJECTS[obj_type]: - assert (hash_to_hex_or_none(obj.id),) in output["snapshot"] + assert (orc.hash_to_hex_or_none(obj.id),) in output["snapshot"] for branch_name, branch in obj.branches.items(): if branch is None: continue assert ( - hash_to_hex_or_none(obj.id), + orc.hash_to_hex_or_none(obj.id), branch_name, - hash_to_hex_or_none(branch.target), + orc.hash_to_hex_or_none(branch.target), str(branch.target_type.value), ) in output["snapshot_branch"] @@ -116,13 +114,15 @@ output = exporter({obj_type: TEST_OBJECTS[obj_type]}) for obj in TEST_OBJECTS[obj_type]: assert ( - hash_to_hex_or_none(obj.id), + orc.hash_to_hex_or_none(obj.id), obj.name, obj.message, - hash_to_hex_or_none(obj.target), + orc.hash_to_hex_or_none(obj.target), obj.target_type.value, obj.author.fullname if obj.author else None, - *swh_date_to_tuple(obj.date.to_dict() if obj.date is not None else None), + *orc.swh_date_to_tuple( + obj.date.to_dict() if obj.date is not None else None + ), obj.raw_manifest, ) in output[obj_type] @@ -132,22 +132,24 @@ output = exporter({obj_type: TEST_OBJECTS[obj_type]}) for obj in TEST_OBJECTS[obj_type]: assert ( - hash_to_hex_or_none(obj.id), + orc.hash_to_hex_or_none(obj.id), obj.message, obj.author.fullname, - *swh_date_to_tuple(obj.date.to_dict() if obj.date is not None else None), + *orc.swh_date_to_tuple( + obj.date.to_dict() if obj.date is not None else None + ), obj.committer.fullname, - *swh_date_to_tuple( + *orc.swh_date_to_tuple( obj.committer_date.to_dict() if obj.committer_date is not None else None ), - hash_to_hex_or_none(obj.directory), + orc.hash_to_hex_or_none(obj.directory), obj.type.value, obj.raw_manifest, ) in output["revision"] for i, parent in enumerate(obj.parents): assert ( - hash_to_hex_or_none(obj.id), - hash_to_hex_or_none(parent), + orc.hash_to_hex_or_none(obj.id), + orc.hash_to_hex_or_none(parent), i, ) in output["revision_history"] @@ -156,13 +158,15 @@ obj_type = "directory" output = exporter({obj_type: TEST_OBJECTS[obj_type]}) for obj in TEST_OBJECTS[obj_type]: - assert (hash_to_hex_or_none(obj.id), obj.raw_manifest) in output["directory"] + assert (orc.hash_to_hex_or_none(obj.id), obj.raw_manifest) in output[ + "directory" + ] for entry in obj.entries: assert ( - hash_to_hex_or_none(obj.id), + orc.hash_to_hex_or_none(obj.id), entry.name, entry.type, - hash_to_hex_or_none(entry.target), + orc.hash_to_hex_or_none(entry.target), entry.perms, ) in output["directory_entry"] @@ -172,12 +176,13 @@ output = exporter({obj_type: TEST_OBJECTS[obj_type]}) for obj in TEST_OBJECTS[obj_type]: assert ( - hash_to_hex_or_none(obj.sha1), - hash_to_hex_or_none(obj.sha1_git), - hash_to_hex_or_none(obj.sha256), - hash_to_hex_or_none(obj.blake2s256), + orc.hash_to_hex_or_none(obj.sha1), + orc.hash_to_hex_or_none(obj.sha1_git), + orc.hash_to_hex_or_none(obj.sha256), + orc.hash_to_hex_or_none(obj.blake2s256), obj.length, obj.status, + None, ) in output[obj_type] @@ -186,10 +191,10 @@ output = exporter({obj_type: TEST_OBJECTS[obj_type]}) for obj in TEST_OBJECTS[obj_type]: assert ( - hash_to_hex_or_none(obj.sha1), - hash_to_hex_or_none(obj.sha1_git), - hash_to_hex_or_none(obj.sha256), - hash_to_hex_or_none(obj.blake2s256), + orc.hash_to_hex_or_none(obj.sha1), + orc.hash_to_hex_or_none(obj.sha1_git), + orc.hash_to_hex_or_none(obj.sha256), + orc.hash_to_hex_or_none(obj.blake2s256), obj.length, obj.status, obj.reason, @@ -198,13 +203,13 @@ def test_date_to_tuple(): ts = {"seconds": 123456, "microseconds": 1515} - assert swh_date_to_tuple({"timestamp": ts, "offset_bytes": b"+0100"}) == ( + assert orc.swh_date_to_tuple({"timestamp": ts, "offset_bytes": b"+0100"}) == ( (123456, 1515), 60, b"+0100", ) - assert swh_date_to_tuple( + assert orc.swh_date_to_tuple( { "timestamp": ts, "offset": 120, @@ -213,15 +218,13 @@ } ) == ((123456, 1515), 60, b"+0100") - assert swh_date_to_tuple( + assert orc.swh_date_to_tuple( {"timestamp": ts, "offset": 120, "negative_utc": False,} ) == ((123456, 1515), 120, b"+0200") - assert swh_date_to_tuple({"timestamp": ts, "offset": 0, "negative_utc": True,}) == ( - (123456, 1515), - 0, - b"-0000", - ) + assert orc.swh_date_to_tuple( + {"timestamp": ts, "offset": 0, "negative_utc": True,} + ) == ((123456, 1515), 0, b"-0000",) # mapping of related tables for each main table (if any) @@ -259,7 +262,8 @@ for orc_file in orcfiles: with orc_file.open("rb") as orc_obj: reader = pyorc.Reader( - orc_obj, converters={pyorc.TypeKind.TIMESTAMP: SWHTimestampConverter}, + orc_obj, + converters={pyorc.TypeKind.TIMESTAMP: orc.SWHTimestampConverter}, ) uuid = reader.user_metadata["swh_uuid"].decode() assert orc_file.basename == f"{obj_type}-{uuid}.orc" @@ -272,7 +276,7 @@ with orc_file.open("rb") as orc_obj: reader = pyorc.Reader( orc_obj, - converters={pyorc.TypeKind.TIMESTAMP: SWHTimestampConverter}, + converters={pyorc.TypeKind.TIMESTAMP: orc.SWHTimestampConverter}, ) assert reader.user_metadata["swh_uuid"].decode() == uuid rows = list(reader) @@ -302,3 +306,31 @@ config = {"orc": {"max_rows": {table_name: 10}}} with pytest.raises(ValueError): exporter({}, config=config) + + +def test_export_content_with_data(monkeypatch, tmpdir): + obj_type = "content" + objstorage = get_objstorage("memory") + for content in TEST_OBJECTS[obj_type]: + objstorage.add(content.data) + + def get_objstorage_mock(**kw): + if kw.get("cls") == "mock": + return objstorage + + monkeypatch.setattr(orc, "get_objstorage", get_objstorage_mock) + config = { + "orc": {"with_data": True, "objstorage": {"cls": "mock"},}, + } + + output = exporter({obj_type: TEST_OBJECTS[obj_type]}, config=config, tmpdir=tmpdir) + for obj in TEST_OBJECTS[obj_type]: + assert ( + orc.hash_to_hex_or_none(obj.sha1), + orc.hash_to_hex_or_none(obj.sha1_git), + orc.hash_to_hex_or_none(obj.sha256), + orc.hash_to_hex_or_none(obj.blake2s256), + obj.length, + obj.status, + obj.data, + ) in output[obj_type]