diff --git a/requirements-swh.txt b/requirements-swh.txt index ee0e353..5b1280c 100644 --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,5 +1,6 @@ # Add here internal Software Heritage dependencies, one per line. swh.core[http] >= 2 swh.journal >= 0.9 swh.model >= 4.3 +swh.objstorage swh.storage diff --git a/swh/dataset/exporters/orc.py b/swh/dataset/exporters/orc.py index ffa5b06..6d99c0d 100644 --- a/swh/dataset/exporters/orc.py +++ b/swh/dataset/exporters/orc.py @@ -1,356 +1,372 @@ # Copyright (C) 2020-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime import logging import math from types import TracebackType from typing import Any, Optional, Tuple, Type, cast from pkg_resources import get_distribution from pyorc import ( BigInt, Binary, CompressionKind, Int, SmallInt, String, Struct, Timestamp, TypeKind, Writer, ) from pyorc.converters import ORCConverter from swh.dataset.exporter import ExporterDispatch from swh.dataset.relational import MAIN_TABLES, TABLES from swh.dataset.utils import remove_pull_requests from swh.model.hashutil import hash_to_hex from swh.model.model import TimestampWithTimezone +from swh.objstorage.factory import get_objstorage +from swh.objstorage.objstorage import ID_HASH_ALGO, ObjNotFoundError ORC_TYPE_MAP = { "string": String, "smallint": SmallInt, "int": Int, "bigint": BigInt, "timestamp": Timestamp, "binary": Binary, } EXPORT_SCHEMA = { table_name: Struct( **{ column_name: ORC_TYPE_MAP[column_type]() for column_name, column_type in columns } ) for table_name, columns in TABLES.items() } logger = logging.getLogger(__name__) def hash_to_hex_or_none(hash): return hash_to_hex(hash) if hash is not None else None def swh_date_to_tuple(obj): if obj is None or obj["timestamp"] is None: return (None, None, None) offset_bytes = obj.get("offset_bytes") if offset_bytes is None: offset = obj.get("offset", 0) negative = offset < 0 or obj.get("negative_utc", False) (hours, minutes) = divmod(abs(offset), 60) offset_bytes = f"{'-' if negative else '+'}{hours:02}{minutes:02}".encode() else: offset = TimestampWithTimezone._parse_offset_bytes(offset_bytes) return ( (obj["timestamp"]["seconds"], obj["timestamp"]["microseconds"]), offset, offset_bytes, ) def datetime_to_tuple(obj: Optional[datetime]) -> Optional[Tuple[int, int]]: if obj is None: return None return (math.floor(obj.timestamp()), obj.microsecond) class SWHTimestampConverter: """This is an ORCConverter compatible class to convert timestamps from/to ORC files timestamps in python are given as a couple (seconds, microseconds) and are serialized as a couple (seconds, nanoseconds) in the ORC file. Reimplemented because we do not want the Python object to be converted as ORC timestamp to be Python datatime objects, since swh.model's Timestamp cannot be converted without loss a Python datetime objects. """ # use Any as timezone annotation to make it easier to run mypy on python < # 3.9, plus we do not use the timezone argument here... @staticmethod def from_orc(seconds: int, nanoseconds: int, timezone: Any,) -> Tuple[int, int]: return (seconds, nanoseconds // 1000) @staticmethod def to_orc( obj: Optional[Tuple[int, int]], timezone: Any, ) -> Optional[Tuple[int, int]]: if obj is None: return None return (obj[0], obj[1] * 1000 if obj[1] is not None else None) class ORCExporter(ExporterDispatch): """ Implementation of an exporter which writes the entire graph dataset as ORC files. Useful for large scale processing, notably on cloud instances (e.g BigQuery, Amazon Athena, Azure). """ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) config = self.config.get("orc", {}) self.max_rows = config.get("max_rows", {}) invalid_tables = [ table_name for table_name in self.max_rows if table_name not in MAIN_TABLES ] if invalid_tables: raise ValueError( "Limiting the number of secondary table (%s) is not supported " "for now.", invalid_tables, ) + self.with_data = config.get("with_data", False) + self.objstorage = None + if self.with_data: + assert "objstorage" in config + self.objstorage = get_objstorage(**config["objstorage"]) self._reset() def _reset(self): self.writers = {} self.writer_files = {} self.uuids = {} self.uuid_main_table = {} def __exit__( self, exc_type: Optional[Type[BaseException]], exc_value: Optional[BaseException], traceback: Optional[TracebackType], ) -> Optional[bool]: for writer in self.writers.values(): writer.close() for fileobj in self.writer_files.values(): fileobj.close() self._reset() return super().__exit__(exc_type, exc_value, traceback) def maybe_close_writer_for(self, table_name: str): uuid = self.uuids.get(table_name) if ( uuid is not None and table_name in self.max_rows and self.writers[table_name].current_row >= self.max_rows[table_name] ): main_table = self.uuid_main_table[uuid] if table_name != main_table: logger.warning( "Limiting the number of secondary table (%s) is not supported " "for now (size limit ignored).", table_name, ) else: # sync/close all tables having the current uuid (aka main and # related tables) for table in [ tname for tname, tuuid in self.uuids.items() if tuuid == uuid ]: # close the writer and remove from the writers dict self.writers.pop(table).close() self.writer_files.pop(table).close() # and clean uuids dicts self.uuids.pop(table) self.uuid_main_table.pop(uuid, None) def get_writer_for(self, table_name: str, unique_id=None): self.maybe_close_writer_for(table_name) if table_name not in self.writers: object_type_dir = self.export_path / table_name object_type_dir.mkdir(exist_ok=True) if unique_id is None: unique_id = self.get_unique_file_id() self.uuid_main_table[unique_id] = table_name export_file = object_type_dir / (f"{table_name}-{unique_id}.orc") export_obj = export_file.open("wb") self.writer_files[table_name] = export_obj self.writers[table_name] = Writer( export_obj, EXPORT_SCHEMA[table_name], compression=CompressionKind.ZSTD, converters={ TypeKind.TIMESTAMP: cast(Type[ORCConverter], SWHTimestampConverter) }, ) self.writers[table_name].set_user_metadata( swh_object_type=table_name.encode(), swh_uuid=unique_id.encode(), swh_model_version=get_distribution("swh.model").version.encode(), swh_dataset_version=get_distribution("swh.dataset").version.encode(), # maybe put a copy of the config (redacted) also? ) self.uuids[table_name] = unique_id return self.writers[table_name] def process_origin(self, origin): origin_writer = self.get_writer_for("origin") origin_writer.write((origin["url"],)) def process_origin_visit(self, visit): origin_visit_writer = self.get_writer_for("origin_visit") origin_visit_writer.write( ( visit["origin"], visit["visit"], datetime_to_tuple(visit["date"]), visit["type"], ) ) def process_origin_visit_status(self, visit_status): origin_visit_status_writer = self.get_writer_for("origin_visit_status") origin_visit_status_writer.write( ( visit_status["origin"], visit_status["visit"], datetime_to_tuple(visit_status["date"]), visit_status["status"], hash_to_hex_or_none(visit_status["snapshot"]), visit_status["type"], ) ) def process_snapshot(self, snapshot): if self.config.get("orc", {}).get("remove_pull_requests"): remove_pull_requests(snapshot) snapshot_writer = self.get_writer_for("snapshot") snapshot_writer.write((hash_to_hex_or_none(snapshot["id"]),)) # we want to store branches in the same directory as snapshot objects, # and have both files have the same UUID. snapshot_branch_writer = self.get_writer_for( "snapshot_branch", unique_id=self.uuids["snapshot"], ) for branch_name, branch in snapshot["branches"].items(): if branch is None: continue snapshot_branch_writer.write( ( hash_to_hex_or_none(snapshot["id"]), branch_name, hash_to_hex_or_none(branch["target"]), branch["target_type"], ) ) def process_release(self, release): release_writer = self.get_writer_for("release") release_writer.write( ( hash_to_hex_or_none(release["id"]), release["name"], release["message"], hash_to_hex_or_none(release["target"]), release["target_type"], (release.get("author") or {}).get("fullname"), *swh_date_to_tuple(release["date"]), release.get("raw_manifest"), ) ) def process_revision(self, revision): release_writer = self.get_writer_for("revision") release_writer.write( ( hash_to_hex_or_none(revision["id"]), revision["message"], revision["author"]["fullname"], *swh_date_to_tuple(revision["date"]), revision["committer"]["fullname"], *swh_date_to_tuple(revision["committer_date"]), hash_to_hex_or_none(revision["directory"]), revision["type"], revision.get("raw_manifest"), ) ) revision_history_writer = self.get_writer_for( "revision_history", unique_id=self.uuids["revision"], ) for i, parent_id in enumerate(revision["parents"]): revision_history_writer.write( ( hash_to_hex_or_none(revision["id"]), hash_to_hex_or_none(parent_id), i, ) ) revision_header_writer = self.get_writer_for( "revision_extra_headers", unique_id=self.uuids["revision"], ) for key, value in revision["extra_headers"]: revision_header_writer.write( (hash_to_hex_or_none(revision["id"]), key, value) ) def process_directory(self, directory): directory_writer = self.get_writer_for("directory") directory_writer.write( (hash_to_hex_or_none(directory["id"]), directory.get("raw_manifest"),) ) directory_entry_writer = self.get_writer_for( "directory_entry", unique_id=self.uuids["directory"], ) for entry in directory["entries"]: directory_entry_writer.write( ( hash_to_hex_or_none(directory["id"]), entry["name"], entry["type"], hash_to_hex_or_none(entry["target"]), entry["perms"], ) ) def process_content(self, content): content_writer = self.get_writer_for("content") + data = None + if self.with_data: + obj_id = content[ID_HASH_ALGO] + try: + data = self.objstorage.get(obj_id) + except ObjNotFoundError: + logger.warning(f"Missing object {hash_to_hex(obj_id)}") + content_writer.write( ( hash_to_hex_or_none(content["sha1"]), hash_to_hex_or_none(content["sha1_git"]), hash_to_hex_or_none(content["sha256"]), hash_to_hex_or_none(content["blake2s256"]), content["length"], content["status"], + data, ) ) def process_skipped_content(self, skipped_content): skipped_content_writer = self.get_writer_for("skipped_content") skipped_content_writer.write( ( hash_to_hex_or_none(skipped_content["sha1"]), hash_to_hex_or_none(skipped_content["sha1_git"]), hash_to_hex_or_none(skipped_content["sha256"]), hash_to_hex_or_none(skipped_content["blake2s256"]), skipped_content["length"], skipped_content["status"], skipped_content["reason"], ) ) diff --git a/swh/dataset/relational.py b/swh/dataset/relational.py index 2430805..3e8b0d1 100644 --- a/swh/dataset/relational.py +++ b/swh/dataset/relational.py @@ -1,110 +1,111 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information # fmt: off MAIN_TABLES = { "origin": [ ("url", "string"), ], "origin_visit": [ ("origin", "string"), ("visit", "bigint"), ("date", "timestamp"), ("type", "string"), ], "origin_visit_status": [ ("origin", "string"), ("visit", "bigint"), ("date", "timestamp"), ("status", "string"), ("snapshot", "string"), ("type", "string"), ], "snapshot": [ ("id", "string"), ], # snapshot_branches is in RELATED_TABLES "release": [ ("id", "string"), ("name", "binary"), ("message", "binary"), ("target", "string"), ("target_type", "string"), ("author", "binary"), ("date", "timestamp"), ("date_offset", "smallint"), ("date_raw_offset_bytes", "binary"), ("raw_manifest", "binary"), ], "revision": [ ("id", "string"), ("message", "binary"), ("author", "binary"), ("date", "timestamp"), ("date_offset", "smallint"), ("date_raw_offset_bytes", "binary"), ("committer", "binary"), ("committer_date", "timestamp"), ("committer_offset", "smallint"), ("committer_date_raw_offset_bytes", "binary"), ("directory", "string"), ("type", "string"), ("raw_manifest", "binary"), ], # revision_history is in RELATED_TABLES # revision_extra_headers is in RELATED_TABLES "directory": [ ("id", "string"), ("raw_manifest", "binary"), ], # direcory_entry is in RELATED_TABLES "content": [ ("sha1", "string"), ("sha1_git", "string"), ("sha256", "string"), ("blake2s256", "string"), ("length", "bigint"), ("status", "string"), + ("data", "binary") ], "skipped_content": [ ("sha1", "string"), ("sha1_git", "string"), ("sha256", "string"), ("blake2s256", "string"), ("length", "bigint"), ("status", "string"), ("reason", "string"), ], } RELATION_TABLES = { "snapshot_branch": [ ("snapshot_id", "string"), ("name", "binary"), ("target", "string"), ("target_type", "string"), ], "revision_history": [ ("id", "string"), ("parent_id", "string"), ("parent_rank", "int"), ], "revision_extra_headers": [ ("id", "string"), ("key", "binary"), ("value", "binary"), ], "directory_entry": [ ("directory_id", "string"), ("name", "binary"), ("type", "string"), ("target", "string"), ("perms", "int"), ], } TABLES = {**MAIN_TABLES, **RELATION_TABLES} # fmt: on diff --git a/swh/dataset/test/test_orc.py b/swh/dataset/test/test_orc.py index 5c0956a..62ca105 100644 --- a/swh/dataset/test/test_orc.py +++ b/swh/dataset/test/test_orc.py @@ -1,304 +1,336 @@ # Copyright (C) 2020-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import collections from contextlib import contextmanager import math from pathlib import Path import tempfile import pyorc import pytest -from swh.dataset.exporters.orc import ( - ORCExporter, - SWHTimestampConverter, - datetime_to_tuple, - hash_to_hex_or_none, - swh_date_to_tuple, -) +from swh.dataset.exporters import orc from swh.dataset.relational import MAIN_TABLES, RELATION_TABLES from swh.model.tests.swh_model_data import TEST_OBJECTS +from swh.objstorage.factory import get_objstorage @contextmanager def orc_tmpdir(tmpdir): if tmpdir: yield Path(tmpdir) else: with tempfile.TemporaryDirectory() as tmpdir: yield Path(tmpdir) @contextmanager def orc_export(messages, config=None, tmpdir=None): with orc_tmpdir(tmpdir) as tmpdir: if config is None: config = {} - with ORCExporter(config, tmpdir) as exporter: + with orc.ORCExporter(config, tmpdir) as exporter: for object_type, objects in messages.items(): for obj in objects: exporter.process_object(object_type, obj.to_dict()) yield tmpdir def orc_load(rootdir): res = collections.defaultdict(list) res["rootdir"] = rootdir for obj_type_dir in rootdir.iterdir(): for orc_file in obj_type_dir.iterdir(): with orc_file.open("rb") as orc_obj: reader = pyorc.Reader( orc_obj, - converters={pyorc.TypeKind.TIMESTAMP: SWHTimestampConverter}, + converters={pyorc.TypeKind.TIMESTAMP: orc.SWHTimestampConverter}, ) obj_type = reader.user_metadata["swh_object_type"].decode() res[obj_type].extend(reader) return res def exporter(messages, config=None, tmpdir=None): with orc_export(messages, config, tmpdir) as exportdir: return orc_load(exportdir) def test_export_origin(): obj_type = "origin" output = exporter({obj_type: TEST_OBJECTS[obj_type]}) for obj in TEST_OBJECTS[obj_type]: assert (obj.url,) in output[obj_type] def test_export_origin_visit(): obj_type = "origin_visit" output = exporter({obj_type: TEST_OBJECTS[obj_type]}) for obj in TEST_OBJECTS[obj_type]: - assert (obj.origin, obj.visit, datetime_to_tuple(obj.date), obj.type) in output[ - obj_type - ] + assert ( + obj.origin, + obj.visit, + orc.datetime_to_tuple(obj.date), + obj.type, + ) in output[obj_type] def test_export_origin_visit_status(): obj_type = "origin_visit_status" output = exporter({obj_type: TEST_OBJECTS[obj_type]}) for obj in TEST_OBJECTS[obj_type]: assert ( obj.origin, obj.visit, - datetime_to_tuple(obj.date), + orc.datetime_to_tuple(obj.date), obj.status, - hash_to_hex_or_none(obj.snapshot), + orc.hash_to_hex_or_none(obj.snapshot), obj.type, ) in output[obj_type] def test_export_snapshot(): obj_type = "snapshot" output = exporter({obj_type: TEST_OBJECTS[obj_type]}) for obj in TEST_OBJECTS[obj_type]: - assert (hash_to_hex_or_none(obj.id),) in output["snapshot"] + assert (orc.hash_to_hex_or_none(obj.id),) in output["snapshot"] for branch_name, branch in obj.branches.items(): if branch is None: continue assert ( - hash_to_hex_or_none(obj.id), + orc.hash_to_hex_or_none(obj.id), branch_name, - hash_to_hex_or_none(branch.target), + orc.hash_to_hex_or_none(branch.target), str(branch.target_type.value), ) in output["snapshot_branch"] def test_export_release(): obj_type = "release" output = exporter({obj_type: TEST_OBJECTS[obj_type]}) for obj in TEST_OBJECTS[obj_type]: assert ( - hash_to_hex_or_none(obj.id), + orc.hash_to_hex_or_none(obj.id), obj.name, obj.message, - hash_to_hex_or_none(obj.target), + orc.hash_to_hex_or_none(obj.target), obj.target_type.value, obj.author.fullname if obj.author else None, - *swh_date_to_tuple(obj.date.to_dict() if obj.date is not None else None), + *orc.swh_date_to_tuple( + obj.date.to_dict() if obj.date is not None else None + ), obj.raw_manifest, ) in output[obj_type] def test_export_revision(): obj_type = "revision" output = exporter({obj_type: TEST_OBJECTS[obj_type]}) for obj in TEST_OBJECTS[obj_type]: assert ( - hash_to_hex_or_none(obj.id), + orc.hash_to_hex_or_none(obj.id), obj.message, obj.author.fullname, - *swh_date_to_tuple(obj.date.to_dict() if obj.date is not None else None), + *orc.swh_date_to_tuple( + obj.date.to_dict() if obj.date is not None else None + ), obj.committer.fullname, - *swh_date_to_tuple( + *orc.swh_date_to_tuple( obj.committer_date.to_dict() if obj.committer_date is not None else None ), - hash_to_hex_or_none(obj.directory), + orc.hash_to_hex_or_none(obj.directory), obj.type.value, obj.raw_manifest, ) in output["revision"] for i, parent in enumerate(obj.parents): assert ( - hash_to_hex_or_none(obj.id), - hash_to_hex_or_none(parent), + orc.hash_to_hex_or_none(obj.id), + orc.hash_to_hex_or_none(parent), i, ) in output["revision_history"] def test_export_directory(): obj_type = "directory" output = exporter({obj_type: TEST_OBJECTS[obj_type]}) for obj in TEST_OBJECTS[obj_type]: - assert (hash_to_hex_or_none(obj.id), obj.raw_manifest) in output["directory"] + assert (orc.hash_to_hex_or_none(obj.id), obj.raw_manifest) in output[ + "directory" + ] for entry in obj.entries: assert ( - hash_to_hex_or_none(obj.id), + orc.hash_to_hex_or_none(obj.id), entry.name, entry.type, - hash_to_hex_or_none(entry.target), + orc.hash_to_hex_or_none(entry.target), entry.perms, ) in output["directory_entry"] def test_export_content(): obj_type = "content" output = exporter({obj_type: TEST_OBJECTS[obj_type]}) for obj in TEST_OBJECTS[obj_type]: assert ( - hash_to_hex_or_none(obj.sha1), - hash_to_hex_or_none(obj.sha1_git), - hash_to_hex_or_none(obj.sha256), - hash_to_hex_or_none(obj.blake2s256), + orc.hash_to_hex_or_none(obj.sha1), + orc.hash_to_hex_or_none(obj.sha1_git), + orc.hash_to_hex_or_none(obj.sha256), + orc.hash_to_hex_or_none(obj.blake2s256), obj.length, obj.status, + None, ) in output[obj_type] def test_export_skipped_content(): obj_type = "skipped_content" output = exporter({obj_type: TEST_OBJECTS[obj_type]}) for obj in TEST_OBJECTS[obj_type]: assert ( - hash_to_hex_or_none(obj.sha1), - hash_to_hex_or_none(obj.sha1_git), - hash_to_hex_or_none(obj.sha256), - hash_to_hex_or_none(obj.blake2s256), + orc.hash_to_hex_or_none(obj.sha1), + orc.hash_to_hex_or_none(obj.sha1_git), + orc.hash_to_hex_or_none(obj.sha256), + orc.hash_to_hex_or_none(obj.blake2s256), obj.length, obj.status, obj.reason, ) in output[obj_type] def test_date_to_tuple(): ts = {"seconds": 123456, "microseconds": 1515} - assert swh_date_to_tuple({"timestamp": ts, "offset_bytes": b"+0100"}) == ( + assert orc.swh_date_to_tuple({"timestamp": ts, "offset_bytes": b"+0100"}) == ( (123456, 1515), 60, b"+0100", ) - assert swh_date_to_tuple( + assert orc.swh_date_to_tuple( { "timestamp": ts, "offset": 120, "negative_utc": False, "offset_bytes": b"+0100", } ) == ((123456, 1515), 60, b"+0100") - assert swh_date_to_tuple( + assert orc.swh_date_to_tuple( {"timestamp": ts, "offset": 120, "negative_utc": False,} ) == ((123456, 1515), 120, b"+0200") - assert swh_date_to_tuple({"timestamp": ts, "offset": 0, "negative_utc": True,}) == ( - (123456, 1515), - 0, - b"-0000", - ) + assert orc.swh_date_to_tuple( + {"timestamp": ts, "offset": 0, "negative_utc": True,} + ) == ((123456, 1515), 0, b"-0000",) # mapping of related tables for each main table (if any) RELATED = { "snapshot": ["snapshot_branch"], "revision": ["revision_history", "revision_extra_headers"], "directory": ["directory_entry"], } @pytest.mark.parametrize( "obj_type", MAIN_TABLES.keys(), ) @pytest.mark.parametrize("max_rows", (None, 1, 2, 10000)) def test_export_related_files(max_rows, obj_type, tmpdir): config = {"orc": {}} if max_rows is not None: config["orc"]["max_rows"] = {obj_type: max_rows} exporter({obj_type: TEST_OBJECTS[obj_type]}, config=config, tmpdir=tmpdir) # check there are as many ORC files as objects orcfiles = [fname for fname in (tmpdir / obj_type).listdir(f"{obj_type}-*.orc")] if max_rows is None: assert len(orcfiles) == 1 else: assert len(orcfiles) == math.ceil(len(TEST_OBJECTS[obj_type]) / max_rows) # check the number of related ORC files for related in RELATED.get(obj_type, ()): related_orcfiles = [ fname for fname in (tmpdir / related).listdir(f"{related}-*.orc") ] assert len(related_orcfiles) == len(orcfiles) # for each ORC file, check related files only reference objects in the # corresponding main table for orc_file in orcfiles: with orc_file.open("rb") as orc_obj: reader = pyorc.Reader( - orc_obj, converters={pyorc.TypeKind.TIMESTAMP: SWHTimestampConverter}, + orc_obj, + converters={pyorc.TypeKind.TIMESTAMP: orc.SWHTimestampConverter}, ) uuid = reader.user_metadata["swh_uuid"].decode() assert orc_file.basename == f"{obj_type}-{uuid}.orc" rows = list(reader) obj_ids = [row[0] for row in rows] # check the related tables for related in RELATED.get(obj_type, ()): orc_file = tmpdir / related / f"{related}-{uuid}.orc" with orc_file.open("rb") as orc_obj: reader = pyorc.Reader( orc_obj, - converters={pyorc.TypeKind.TIMESTAMP: SWHTimestampConverter}, + converters={pyorc.TypeKind.TIMESTAMP: orc.SWHTimestampConverter}, ) assert reader.user_metadata["swh_uuid"].decode() == uuid rows = list(reader) # check branches in this file only concern current snapshot (obj_id) for row in rows: assert row[0] in obj_ids @pytest.mark.parametrize( "obj_type", MAIN_TABLES.keys(), ) def test_export_related_files_separated(obj_type, tmpdir): exporter({obj_type: TEST_OBJECTS[obj_type]}, tmpdir=tmpdir) # check there are as many ORC files as objects orcfiles = [fname for fname in (tmpdir / obj_type).listdir(f"{obj_type}-*.orc")] assert len(orcfiles) == 1 # check related ORC files are in their own directory for related in RELATED.get(obj_type, ()): related_orcfiles = [ fname for fname in (tmpdir / related).listdir(f"{related}-*.orc") ] assert len(related_orcfiles) == len(orcfiles) @pytest.mark.parametrize("table_name", RELATION_TABLES.keys()) def test_export_invalid_max_rows(table_name): config = {"orc": {"max_rows": {table_name: 10}}} with pytest.raises(ValueError): exporter({}, config=config) + + +def test_export_content_with_data(monkeypatch, tmpdir): + obj_type = "content" + objstorage = get_objstorage("memory") + for content in TEST_OBJECTS[obj_type]: + objstorage.add(content.data) + + def get_objstorage_mock(**kw): + if kw.get("cls") == "mock": + return objstorage + + monkeypatch.setattr(orc, "get_objstorage", get_objstorage_mock) + config = { + "orc": {"with_data": True, "objstorage": {"cls": "mock"},}, + } + + output = exporter({obj_type: TEST_OBJECTS[obj_type]}, config=config, tmpdir=tmpdir) + for obj in TEST_OBJECTS[obj_type]: + assert ( + orc.hash_to_hex_or_none(obj.sha1), + orc.hash_to_hex_or_none(obj.sha1_git), + orc.hash_to_hex_or_none(obj.sha256), + orc.hash_to_hex_or_none(obj.blake2s256), + obj.length, + obj.status, + obj.data, + ) in output[obj_type]