diff --git a/mypy.ini b/mypy.ini --- a/mypy.ini +++ b/mypy.ini @@ -14,6 +14,9 @@ [mypy-cassandra.*] ignore_missing_imports = True +[mypy-confluent_kafka.*] +ignore_missing_imports = True + # only shipped indirectly via hypothesis [mypy-django.*] ignore_missing_imports = True @@ -27,6 +30,9 @@ [mypy-pytest.*] ignore_missing_imports = True +[mypy-pytest_kafka.*] +ignore_missing_imports = True + [mypy-systemd.daemon.*] ignore_missing_imports = True diff --git a/swh/storage/cli.py b/swh/storage/cli.py --- a/swh/storage/cli.py +++ b/swh/storage/cli.py @@ -3,13 +3,21 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import functools import logging import click from swh.core.cli import CONTEXT_SETTINGS +from swh.journal.cli import get_journal_client +from swh.storage import get_storage from swh.storage.api.server import load_and_check_config, app +try: + from systemd.daemon import notify +except ImportError: + notify = None + @click.group(name="storage", context_settings=CONTEXT_SETTINGS) @click.pass_context @@ -103,6 +111,47 @@ ctx.exit(0) +@storage.command() +@click.option( + "--stop-after-objects", + "-n", + default=None, + type=int, + help="Stop after processing this many objects. Default is to " "run forever.", +) +@click.pass_context +def replay(ctx, stop_after_objects): + """Fill a Storage by reading a Journal. + + There can be several 'replayers' filling a Storage as long as they use + the same `group-id`. + """ + from swh.storage.replay import process_replay_objects + + conf = ctx.obj["config"] + try: + storage = get_storage(**conf.pop("storage")) + except KeyError: + ctx.fail("You must have a storage configured in your config file.") + + client = get_journal_client(ctx, stop_after_objects=stop_after_objects) + worker_fn = functools.partial(process_replay_objects, storage=storage) + + if notify: + notify("READY=1") + + try: + client.process(worker_fn) + except KeyboardInterrupt: + ctx.exit(0) + else: + print("Done.") + finally: + if notify: + notify("STOPPING=1") + client.close() + + def main(): logging.basicConfig() return serve(auto_envvar_prefix="SWH_STORAGE") diff --git a/swh/storage/fixer.py b/swh/storage/fixer.py new file mode 100644 --- /dev/null +++ b/swh/storage/fixer.py @@ -0,0 +1,293 @@ +import copy +import logging +from typing import Any, Dict, List, Optional +from swh.model.identifiers import normalize_timestamp + +logger = logging.getLogger(__name__) + + +def _fix_content(content: Dict[str, Any]) -> Dict[str, Any]: + """Filters-out invalid 'perms' key that leaked from swh.model.from_disk + to the journal. + + >>> _fix_content({'perms': 0o100644, 'sha1_git': b'foo'}) + {'sha1_git': b'foo'} + + >>> _fix_content({'sha1_git': b'bar'}) + {'sha1_git': b'bar'} + + """ + content = content.copy() + content.pop("perms", None) + return content + + +def _fix_revision_pypi_empty_string(rev): + """PyPI loader failed to encode empty strings as bytes, see: + swh:1:rev:8f0095ee0664867055d03de9bcc8f95b91d8a2b9 + or https://forge.softwareheritage.org/D1772 + """ + rev = { + **rev, + "author": rev["author"].copy(), + "committer": rev["committer"].copy(), + } + if rev["author"].get("email") == "": + rev["author"]["email"] = b"" + if rev["author"].get("name") == "": + rev["author"]["name"] = b"" + if rev["committer"].get("email") == "": + rev["committer"]["email"] = b"" + if rev["committer"].get("name") == "": + rev["committer"]["name"] = b"" + return rev + + +def _fix_revision_transplant_source(rev): + if rev.get("metadata") and rev["metadata"].get("extra_headers"): + rev = copy.deepcopy(rev) + rev["metadata"]["extra_headers"] = [ + [key, value.encode("ascii")] + if key == "transplant_source" and isinstance(value, str) + else [key, value] + for (key, value) in rev["metadata"]["extra_headers"] + ] + return rev + + +def _check_date(date): + """Returns whether the date can be represented in backends with sane + limits on timestamps and timezones (resp. signed 64-bits and + signed 16 bits), and that microseconds is valid (ie. between 0 and 10^6). + """ + if date is None: + return True + date = normalize_timestamp(date) + return ( + (-(2 ** 63) <= date["timestamp"]["seconds"] < 2 ** 63) + and (0 <= date["timestamp"]["microseconds"] < 10 ** 6) + and (-(2 ** 15) <= date["offset"] < 2 ** 15) + ) + + +def _check_revision_date(rev): + """Exclude revisions with invalid dates. + See https://forge.softwareheritage.org/T1339""" + return _check_date(rev["date"]) and _check_date(rev["committer_date"]) + + +def _fix_revision(revision: Dict[str, Any]) -> Optional[Dict]: + """Fix various legacy revision issues. + + Fix author/committer person: + + >>> from pprint import pprint + >>> date = { + ... 'timestamp': { + ... 'seconds': 1565096932, + ... 'microseconds': 0, + ... }, + ... 'offset': 0, + ... } + >>> rev0 = _fix_revision({ + ... 'id': b'rev-id', + ... 'author': {'fullname': b'', 'name': '', 'email': ''}, + ... 'committer': {'fullname': b'', 'name': '', 'email': ''}, + ... 'date': date, + ... 'committer_date': date, + ... 'type': 'git', + ... 'message': '', + ... 'directory': b'dir-id', + ... 'synthetic': False, + ... }) + >>> rev0['author'] + {'fullname': b'', 'name': b'', 'email': b''} + >>> rev0['committer'] + {'fullname': b'', 'name': b'', 'email': b''} + + Fix type of 'transplant_source' extra headers: + + >>> rev1 = _fix_revision({ + ... 'id': b'rev-id', + ... 'author': {'fullname': b'', 'name': '', 'email': ''}, + ... 'committer': {'fullname': b'', 'name': '', 'email': ''}, + ... 'date': date, + ... 'committer_date': date, + ... 'metadata': { + ... 'extra_headers': [ + ... ['time_offset_seconds', b'-3600'], + ... ['transplant_source', '29c154a012a70f49df983625090434587622b39e'] + ... ]}, + ... 'type': 'git', + ... 'message': '', + ... 'directory': b'dir-id', + ... 'synthetic': False, + ... }) + >>> pprint(rev1['metadata']['extra_headers']) + [['time_offset_seconds', b'-3600'], + ['transplant_source', b'29c154a012a70f49df983625090434587622b39e']] + + Revision with invalid date are filtered: + + >>> from copy import deepcopy + >>> invalid_date1 = deepcopy(date) + >>> invalid_date1['timestamp']['microseconds'] = 1000000000 # > 10^6 + >>> rev = _fix_revision({ + ... 'author': {'fullname': b'', 'name': '', 'email': ''}, + ... 'committer': {'fullname': b'', 'name': '', 'email': ''}, + ... 'date': invalid_date1, + ... 'committer_date': date, + ... }) + >>> rev is None + True + + >>> invalid_date2 = deepcopy(date) + >>> invalid_date2['timestamp']['seconds'] = 2**70 # > 10^63 + >>> rev = _fix_revision({ + ... 'author': {'fullname': b'', 'name': '', 'email': ''}, + ... 'committer': {'fullname': b'', 'name': '', 'email': ''}, + ... 'date': invalid_date2, + ... 'committer_date': date, + ... }) + >>> rev is None + True + + >>> invalid_date3 = deepcopy(date) + >>> invalid_date3['offset'] = 2**20 # > 10^15 + >>> rev = _fix_revision({ + ... 'author': {'fullname': b'', 'name': '', 'email': ''}, + ... 'committer': {'fullname': b'', 'name': '', 'email': ''}, + ... 'date': date, + ... 'committer_date': invalid_date3, + ... }) + >>> rev is None + True + + """ # noqa + rev = _fix_revision_pypi_empty_string(revision) + rev = _fix_revision_transplant_source(rev) + if not _check_revision_date(rev): + logger.warning( + "Invalid revision date detected: %(revision)s", {"revision": rev} + ) + return None + return rev + + +def _fix_origin(origin: Dict) -> Dict: + """Fix legacy origin with type which is no longer part of the model. + + >>> from pprint import pprint + >>> pprint(_fix_origin({ + ... 'url': 'http://foo', + ... })) + {'url': 'http://foo'} + >>> pprint(_fix_origin({ + ... 'url': 'http://bar', + ... 'type': 'foo', + ... })) + {'url': 'http://bar'} + + """ + o = origin.copy() + o.pop("type", None) + return o + + +def _fix_origin_visit(visit: Dict) -> Dict: + """Fix various legacy origin visit issues. + + `visit['origin']` is a dict instead of an URL: + + >>> from datetime import datetime, timezone + >>> from pprint import pprint + >>> date = datetime(2020, 2, 27, 14, 39, 19, tzinfo=timezone.utc) + >>> pprint(_fix_origin_visit({ + ... 'origin': {'url': 'http://foo'}, + ... 'date': date, + ... 'type': 'git', + ... 'status': 'ongoing', + ... 'snapshot': None, + ... })) + {'date': datetime.datetime(2020, 2, 27, 14, 39, 19, tzinfo=datetime.timezone.utc), + 'metadata': None, + 'origin': 'http://foo', + 'snapshot': None, + 'status': 'ongoing', + 'type': 'git'} + + `visit['type']` is missing , but `origin['visit']['type']` exists: + + >>> pprint(_fix_origin_visit( + ... {'origin': {'type': 'hg', 'url': 'http://foo'}, + ... 'date': date, + ... 'status': 'ongoing', + ... 'snapshot': None, + ... })) + {'date': datetime.datetime(2020, 2, 27, 14, 39, 19, tzinfo=datetime.timezone.utc), + 'metadata': None, + 'origin': 'http://foo', + 'snapshot': None, + 'status': 'ongoing', + 'type': 'hg'} + + Old visit format (origin_visit with no type) raises: + + >>> _fix_origin_visit({ + ... 'origin': {'url': 'http://foo'}, + ... 'date': date, + ... 'status': 'ongoing', + ... 'snapshot': None + ... }) + Traceback (most recent call last): + ... + ValueError: Old origin visit format detected... + + >>> _fix_origin_visit({ + ... 'origin': 'http://foo', + ... 'date': date, + ... 'status': 'ongoing', + ... 'snapshot': None + ... }) + Traceback (most recent call last): + ... + ValueError: Old origin visit format detected... + + """ # noqa + visit = visit.copy() + if "type" not in visit: + if isinstance(visit["origin"], dict) and "type" in visit["origin"]: + # Very old version of the schema: visits did not have a type, + # but their 'origin' field was a dict with a 'type' key. + visit["type"] = visit["origin"]["type"] + else: + # Very old schema version: 'type' is missing, stop early + + # We expect the journal's origin_visit topic to no longer reference + # such visits. If it does, the replayer must crash so we can fix + # the journal's topic. + raise ValueError(f"Old origin visit format detected: {visit}") + if isinstance(visit["origin"], dict): + # Old version of the schema: visit['origin'] was a dict. + visit["origin"] = visit["origin"]["url"] + if "metadata" not in visit: + visit["metadata"] = None + return visit + + +def fix_objects(object_type: str, objects: List[Dict]) -> List[Dict]: + """ + Fix legacy objects from the journal to bring them up to date with the + latest storage schema. + """ + if object_type == "content": + return [_fix_content(v) for v in objects] + elif object_type == "revision": + revisions = [_fix_revision(v) for v in objects] + return [rev for rev in revisions if rev is not None] + elif object_type == "origin": + return [_fix_origin(v) for v in objects] + elif object_type == "origin_visit": + return [_fix_origin_visit(v) for v in objects] + else: + return objects diff --git a/swh/storage/replay.py b/swh/storage/replay.py new file mode 100644 --- /dev/null +++ b/swh/storage/replay.py @@ -0,0 +1,128 @@ +# Copyright (C) 2019-2020 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import logging +from typing import Any, Callable, Dict, Iterable, List + +try: + from systemd.daemon import notify +except ImportError: + notify = None + +from swh.core.statsd import statsd +from swh.storage.fixer import fix_objects + +from swh.model.model import ( + BaseContent, + BaseModel, + Content, + Directory, + Origin, + OriginVisit, + Revision, + SkippedContent, + Snapshot, + Release, +) +from swh.storage.exc import HashCollision + +logger = logging.getLogger(__name__) + +GRAPH_OPERATIONS_METRIC = "swh_graph_replayer_operations_total" +GRAPH_DURATION_METRIC = "swh_graph_replayer_duration_seconds" + + +object_converter_fn: Dict[str, Callable[[Dict], BaseModel]] = { + "origin": Origin.from_dict, + "origin_visit": OriginVisit.from_dict, + "snapshot": Snapshot.from_dict, + "revision": Revision.from_dict, + "release": Release.from_dict, + "directory": Directory.from_dict, + "content": Content.from_dict, + "skipped_content": SkippedContent.from_dict, +} + + +def process_replay_objects(all_objects, *, storage): + for (object_type, objects) in all_objects.items(): + logger.debug("Inserting %s %s objects", len(objects), object_type) + with statsd.timed(GRAPH_DURATION_METRIC, tags={"object_type": object_type}): + _insert_objects(object_type, objects, storage) + statsd.increment( + GRAPH_OPERATIONS_METRIC, len(objects), tags={"object_type": object_type} + ) + if notify: + notify("WATCHDOG=1") + + +def collision_aware_content_add( + content_add_fn: Callable[[Iterable[Any]], None], contents: List[BaseContent] +) -> None: + """Add contents to storage. If a hash collision is detected, an error is + logged. Then this adds the other non colliding contents to the storage. + + Args: + content_add_fn: Storage content callable + contents: List of contents or skipped contents to add to storage + + """ + if not contents: + return + colliding_content_hashes: List[Dict[str, Any]] = [] + while True: + try: + content_add_fn(contents) + except HashCollision as e: + colliding_content_hashes.append( + { + "algo": e.algo, + "hash": e.hash_id, # hex hash id + "objects": e.colliding_contents, # hex hashes + } + ) + colliding_hashes = e.colliding_content_hashes() + # Drop the colliding contents from the transaction + contents = [c for c in contents if c.hashes() not in colliding_hashes] + else: + # Successfully added contents, we are done + break + if colliding_content_hashes: + for collision in colliding_content_hashes: + logger.error("Collision detected: %(collision)s", {"collision": collision}) + + +def _insert_objects(object_type: str, objects: List[Dict], storage) -> None: + """Insert objects of type object_type in the storage. + + """ + objects = fix_objects(object_type, objects) + + if object_type == "content": + contents: List[BaseContent] = [] + skipped_contents: List[BaseContent] = [] + for content in objects: + c = BaseContent.from_dict(content) + if isinstance(c, SkippedContent): + skipped_contents.append(c) + else: + contents.append(c) + + collision_aware_content_add(storage.skipped_content_add, skipped_contents) + collision_aware_content_add(storage.content_add_metadata, contents) + elif object_type == "origin_visit": + visits: List[OriginVisit] = [] + origins: List[Origin] = [] + for obj in objects: + visit = OriginVisit.from_dict(obj) + visits.append(visit) + origins.append(Origin(url=visit.origin)) + storage.origin_add(origins) + storage.origin_visit_upsert(visits) + elif object_type in ("directory", "revision", "release", "snapshot", "origin"): + method = getattr(storage, object_type + "_add") + method(object_converter_fn[object_type](o) for o in objects) + else: + logger.warning("Received a series of %s, this should not happen", object_type) diff --git a/swh/storage/tests/conftest.py b/swh/storage/tests/conftest.py --- a/swh/storage/tests/conftest.py +++ b/swh/storage/tests/conftest.py @@ -19,7 +19,6 @@ import swh.storage from swh.core.utils import numfile_sortkey as sortkey - from swh.model.tests.generate_testdata import gen_contents, gen_origins from swh.model.model import ( Content, @@ -45,7 +44,6 @@ "snapshot": Snapshot.from_dict, } - SQL_DIR = path.join(path.dirname(swh.storage.__file__), "sql") environ["LC_ALL"] = "C.UTF-8" diff --git a/swh/storage/tests/test_replay.py b/swh/storage/tests/test_replay.py new file mode 100644 --- /dev/null +++ b/swh/storage/tests/test_replay.py @@ -0,0 +1,155 @@ +# Copyright (C) 2019-2020 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import datetime +import functools + +from typing import Dict, List + +import pytest + +from swh.storage import get_storage + +from swh.storage.replay import process_replay_objects + +from swh.journal.tests.utils import MockedJournalClient, MockedKafkaWriter + + +storage_config = {"cls": "pipeline", "steps": [{"cls": "memory"},]} + + +def make_topic(kafka_prefix: str, object_type: str) -> str: + return kafka_prefix + "." + object_type + + +def _test_write_replay_origin_visit(visits: List[Dict]): + """Helper function to write tests for origin_visit. + + Each visit (a dict) given in the 'visits' argument will be sent to + a (mocked) kafka queue, which a in-memory-storage backed replayer is + listening to. + + Check that corresponding origin visits entities are present in the storage + and have correct values if they are not skipped. + + """ + queue: List = [] + replayer = MockedJournalClient(queue) + writer = MockedKafkaWriter(queue) + + # Note that flipping the order of these two insertions will crash + # the test, because the legacy origin_format does not allow to create + # the origin when needed (type is missing) + writer.send( + "origin", + "foo", + { + "url": "http://example.com/", + "type": "git", # test the legacy origin format is accepted + }, + ) + for visit in visits: + writer.send("origin_visit", "foo", visit) + + queue_size = len(queue) + assert replayer.stop_after_objects is None + replayer.stop_after_objects = queue_size + + storage = get_storage(**storage_config) + worker_fn = functools.partial(process_replay_objects, storage=storage) + + replayer.process(worker_fn) + + actual_visits = list(storage.origin_visit_get("http://example.com/")) + + assert len(actual_visits) == len(visits), actual_visits + + for vin, vout in zip(visits, actual_visits): + vin = vin.copy() + vout = vout.copy() + assert vout.pop("origin") == "http://example.com/" + vin.pop("origin") + vin.setdefault("type", "git") + vin.setdefault("metadata", None) + assert vin == vout + + +def test_write_replay_origin_visit(): + """Test origin_visit when the 'origin' is just a string.""" + now = datetime.datetime.now() + visits = [ + { + "visit": 1, + "origin": "http://example.com/", + "date": now, + "type": "git", + "status": "partial", + "snapshot": None, + } + ] + _test_write_replay_origin_visit(visits) + + +def test_write_replay_legacy_origin_visit1(): + """Origin_visit with no types should make the replayer crash + + We expect the journal's origin_visit topic to no longer reference such + visits. If it does, the replayer must crash so we can fix the journal's + topic. + + """ + now = datetime.datetime.now() + visit = { + "visit": 1, + "origin": "http://example.com/", + "date": now, + "status": "partial", + "snapshot": None, + } + now2 = datetime.datetime.now() + visit2 = { + "visit": 2, + "origin": {"url": "http://example.com/"}, + "date": now2, + "status": "partial", + "snapshot": None, + } + + for origin_visit in [visit, visit2]: + with pytest.raises(ValueError, match="Old origin visit format"): + _test_write_replay_origin_visit([origin_visit]) + + +def test_write_replay_legacy_origin_visit2(): + """Test origin_visit when 'type' is missing from the visit, but not + from the origin.""" + now = datetime.datetime.now() + visits = [ + { + "visit": 1, + "origin": {"url": "http://example.com/", "type": "git",}, + "date": now, + "type": "git", + "status": "partial", + "snapshot": None, + } + ] + _test_write_replay_origin_visit(visits) + + +def test_write_replay_legacy_origin_visit3(): + """Test origin_visit when the origin is a dict""" + now = datetime.datetime.now() + visits = [ + { + "visit": 1, + "origin": {"url": "http://example.com/",}, + "date": now, + "type": "git", + "status": "partial", + "snapshot": None, + } + ] + _test_write_replay_origin_visit(visits) diff --git a/swh/storage/tests/test_write_replay.py b/swh/storage/tests/test_write_replay.py new file mode 100644 --- /dev/null +++ b/swh/storage/tests/test_write_replay.py @@ -0,0 +1,166 @@ +# Copyright (C) 2019-2020 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import functools +from unittest.mock import patch + +import attr +from hypothesis import given, settings, HealthCheck +from hypothesis.strategies import lists + +from swh.model.hypothesis_strategies import present_contents +from swh.model.hypothesis_strategies import object_dicts +from swh.model.model import Origin +from swh.storage import get_storage +from swh.storage.exc import HashCollision + +from swh.journal.replay import ( + process_replay_objects, + process_replay_objects_content, + object_converter_fn, +) + +from swh.journal.tests.utils import MockedJournalClient, MockedKafkaWriter + + +storage_config = { + "cls": "memory", + "journal_writer": {"cls": "memory"}, +} + + +def empty_person_name_email(rev_or_rel): + """Empties the 'name' and 'email' fields of the author/committer fields + of a revision or release; leaving only the fullname.""" + if getattr(rev_or_rel, "author", None): + rev_or_rel = attr.evolve( + rev_or_rel, author=attr.evolve(rev_or_rel.author, name=b"", email=b"",) + ) + + if getattr(rev_or_rel, "committer", None): + rev_or_rel = attr.evolve( + rev_or_rel, + committer=attr.evolve(rev_or_rel.committer, name=b"", email=b"",), + ) + + return rev_or_rel + + +@given(lists(object_dicts(), min_size=1)) +@settings(suppress_health_check=[HealthCheck.too_slow]) +def test_write_replay_same_order_batches(objects): + queue = [] + replayer = MockedJournalClient(queue) + + with patch( + "swh.journal.writer.inmemory.InMemoryJournalWriter", + return_value=MockedKafkaWriter(queue), + ): + storage1 = get_storage(**storage_config) + + # Write objects to storage1 + for (obj_type, obj) in objects: + if obj_type == "content" and obj.get("status") == "absent": + obj_type = "skipped_content" + + obj = object_converter_fn[obj_type](obj) + + if obj_type == "origin_visit": + storage1.origin_add_one(Origin(url=obj.origin)) + storage1.origin_visit_upsert([obj]) + else: + method = getattr(storage1, obj_type + "_add") + try: + method([obj]) + except HashCollision: + pass + + # Bail out early if we didn't insert any relevant objects... + queue_size = len(queue) + assert queue_size != 0, "No test objects found; hypothesis strategy bug?" + + assert replayer.stop_after_objects is None + replayer.stop_after_objects = queue_size + + storage2 = get_storage(**storage_config) + worker_fn = functools.partial(process_replay_objects, storage=storage2) + + replayer.process(worker_fn) + + assert replayer.consumer.committed + + for attr_name in ( + "_contents", + "_directories", + "_snapshots", + "_origin_visits", + "_origins", + ): + assert getattr(storage1, attr_name) == getattr(storage2, attr_name), attr_name + + # When hypothesis generates a revision and a release with same + # author (or committer) fullname but different name or email, then + # the storage will use the first name/email it sees. + # This first one will be either the one from the revision or the release, + # and since there is no order guarantees, storage2 has 1/2 chance of + # not seeing the same order as storage1, therefore we need to strip + # them out before comparing. + for attr_name in ("_revisions", "_releases"): + items1 = { + k: empty_person_name_email(v) + for (k, v) in getattr(storage1, attr_name).items() + } + items2 = { + k: empty_person_name_email(v) + for (k, v) in getattr(storage2, attr_name).items() + } + assert items1 == items2, attr_name + + +# TODO: add test for hash collision + + +@given(lists(present_contents(), min_size=1)) +@settings(suppress_health_check=[HealthCheck.too_slow]) +def test_write_replay_content(objects): + + queue = [] + replayer = MockedJournalClient(queue) + + with patch( + "swh.journal.writer.inmemory.InMemoryJournalWriter", + return_value=MockedKafkaWriter(queue), + ): + storage1 = get_storage(**storage_config) + + contents = [] + for obj in objects: + storage1.content_add([obj]) + contents.append(obj) + + # Bail out early if we didn't insert any relevant objects... + queue_size = len(queue) + assert queue_size != 0, "No test objects found; hypothesis strategy bug?" + + assert replayer.stop_after_objects is None + replayer.stop_after_objects = queue_size + + storage2 = get_storage(**storage_config) + + objstorage1 = storage1.objstorage.objstorage + objstorage2 = storage2.objstorage.objstorage + + worker_fn = functools.partial( + process_replay_objects_content, src=objstorage1, dst=objstorage2 + ) + + replayer.process(worker_fn) + + # only content with status visible will be copied in storage2 + expected_objstorage_state = { + c.sha1: c.data for c in contents if c.status == "visible" + } + + assert expected_objstorage_state == objstorage2.state