diff --git a/swh/journal/serializers.py b/swh/journal/serializers.py index 1af8cdf..4fc4a71 100644 --- a/swh/journal/serializers.py +++ b/swh/journal/serializers.py @@ -1,90 +1,90 @@ # Copyright (C) 2016-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Any, Dict, Union, overload import msgpack from swh.core.api.serializers import msgpack_dumps, msgpack_loads from swh.model.hashutil import DEFAULT_ALGORITHMS from swh.model.model import ( Content, Directory, Origin, OriginVisit, Release, Revision, SkippedContent, Snapshot, ) ModelObject = Union[ Content, Directory, Origin, OriginVisit, Release, Revision, SkippedContent, Snapshot ] KeyType = Union[Dict[str, str], Dict[str, bytes], bytes] # these @overload'ed versions of the object_key method aim at helping mypy figuring # the correct type-ing. @overload def object_key( object_type: str, object_: Union[Content, Directory, Revision, Release, Snapshot] ) -> bytes: ... @overload def object_key( object_type: str, object_: Union[Origin, SkippedContent] ) -> Dict[str, bytes]: ... @overload def object_key(object_type: str, object_: OriginVisit) -> Dict[str, str]: ... def object_key(object_type: str, object_) -> KeyType: if object_type in ("revision", "release", "directory", "snapshot"): return object_.id elif object_type == "content": return object_.sha1 # TODO: use a dict of hashes elif object_type == "skipped_content": return {hash: getattr(object_, hash) for hash in DEFAULT_ALGORITHMS} elif object_type == "origin": return {"url": object_.url} elif object_type == "origin_visit": return { "origin": object_.origin, "date": str(object_.date), } else: raise ValueError("Unknown object type: %s." % object_type) def key_to_kafka(key: KeyType) -> bytes: """Serialize a key, possibly a dict, in a predictable way""" p = msgpack.Packer(use_bin_type=True) if isinstance(key, dict): return p.pack_map_pairs(sorted(key.items())) else: return p.pack(key) def kafka_to_key(kafka_key: bytes) -> KeyType: """Deserialize a key""" - return msgpack.loads(kafka_key) + return msgpack.loads(kafka_key, raw=False, strict_map_key=False) def value_to_kafka(value: Any) -> bytes: """Serialize some data for storage in kafka""" return msgpack_dumps(value) def kafka_to_value(kafka_value: bytes) -> Any: """Deserialize some data stored in kafka""" return msgpack_loads(kafka_value) diff --git a/swh/journal/tests/conftest.py b/swh/journal/tests/conftest.py index 1701ff9..e7169e2 100644 --- a/swh/journal/tests/conftest.py +++ b/swh/journal/tests/conftest.py @@ -1,358 +1,358 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import os import pytest import logging import random import string from confluent_kafka import Consumer from confluent_kafka.admin import AdminClient, ConfigResource from hypothesis.strategies import one_of from subprocess import Popen -from typing import Any, Dict, Iterator, List, Optional, Tuple +from typing import Any, Dict, Iterator, List, Tuple from pathlib import Path from pytest_kafka import ( make_zookeeper_process, make_kafka_server, KAFKA_SERVER_CONFIG_TEMPLATE, ZOOKEEPER_CONFIG_TEMPLATE, ) from swh.model import hypothesis_strategies as strategies from swh.model.hashutil import MultiHash, hash_to_bytes from swh.journal.serializers import ModelObject from swh.journal.writer.kafka import OBJECT_TYPES logger = logging.getLogger(__name__) CONTENTS = [ {**MultiHash.from_data(b"foo").digest(), "length": 3, "status": "visible",}, ] duplicate_content1 = { "length": 4, "sha1": hash_to_bytes("44973274ccef6ab4dfaaf86599792fa9c3fe4689"), "sha1_git": b"another-foo", "blake2s256": b"another-bar", "sha256": b"another-baz", "status": "visible", } # Craft a sha1 collision duplicate_content2 = duplicate_content1.copy() sha1_array = bytearray(duplicate_content1["sha1_git"]) sha1_array[0] += 1 duplicate_content2["sha1_git"] = bytes(sha1_array) DUPLICATE_CONTENTS = [duplicate_content1, duplicate_content2] COMMITTERS = [ {"fullname": b"foo", "name": b"foo", "email": b"",}, {"fullname": b"bar", "name": b"bar", "email": b"",}, ] DATES = [ { "timestamp": {"seconds": 1234567891, "microseconds": 0,}, "offset": 120, "negative_utc": False, }, { "timestamp": {"seconds": 1234567892, "microseconds": 0,}, "offset": 120, "negative_utc": False, }, ] REVISIONS = [ { "id": hash_to_bytes("7026b7c1a2af56521e951c01ed20f255fa054238"), "message": b"hello", "date": DATES[0], "committer": COMMITTERS[0], "author": COMMITTERS[0], "committer_date": DATES[0], "type": "git", "directory": b"\x01" * 20, "synthetic": False, "metadata": None, "parents": [], }, { "id": hash_to_bytes("368a48fe15b7db2383775f97c6b247011b3f14f4"), "message": b"hello again", "date": DATES[1], "committer": COMMITTERS[1], "author": COMMITTERS[1], "committer_date": DATES[1], "type": "hg", "directory": b"\x02" * 20, "synthetic": False, "metadata": None, "parents": [], }, ] RELEASES = [ { "id": hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), "name": b"v0.0.1", "date": { "timestamp": {"seconds": 1234567890, "microseconds": 0,}, "offset": 120, "negative_utc": False, }, "author": COMMITTERS[0], "target_type": "revision", "target": b"\x04" * 20, "message": b"foo", "synthetic": False, }, ] ORIGINS = [ {"url": "https://somewhere.org/den/fox",}, {"url": "https://overtherainbow.org/fox/den",}, ] ORIGIN_VISITS = [ { "origin": ORIGINS[0]["url"], "date": "2013-05-07 04:20:39.369271+00:00", "snapshot": None, # TODO "status": "ongoing", # TODO "metadata": {"foo": "bar"}, "type": "git", }, { "origin": ORIGINS[0]["url"], "date": "2018-11-27 17:20:39+00:00", "snapshot": None, # TODO "status": "ongoing", # TODO "metadata": {"baz": "qux"}, "type": "git", }, ] -TEST_OBJECT_DICTS: Dict[str, Tuple[Optional[str], List[Dict[str, Any]]]] = { - "content": ("sha1", CONTENTS), - "revision": ("id", REVISIONS), - "release": ("id", RELEASES), - "origin": (None, ORIGINS), - "origin_visit": (None, ORIGIN_VISITS), +TEST_OBJECT_DICTS: Dict[str, List[Dict[str, Any]]] = { + "content": CONTENTS, + "revision": REVISIONS, + "release": RELEASES, + "origin": ORIGINS, + "origin_visit": ORIGIN_VISITS, } MODEL_OBJECTS = {v: k for (k, v) in OBJECT_TYPES.items()} TEST_OBJECTS: Dict[str, List[ModelObject]] = {} -for object_type, (_, objects) in TEST_OBJECT_DICTS.items(): +for object_type, objects in TEST_OBJECT_DICTS.items(): converted_objects: List[ModelObject] = [] model = MODEL_OBJECTS[object_type] for (num, obj_d) in enumerate(objects): if object_type == "origin_visit": obj_d = {**obj_d, "visit": num} elif object_type == "content": obj_d = {**obj_d, "data": b"", "ctime": datetime.datetime.now()} converted_objects.append(model.from_dict(obj_d)) TEST_OBJECTS[object_type] = converted_objects KAFKA_ROOT = os.environ.get("SWH_KAFKA_ROOT") KAFKA_ROOT = KAFKA_ROOT if KAFKA_ROOT else os.path.dirname(__file__) + "/kafka" if not os.path.exists(KAFKA_ROOT): msg = ( "Development error: %s must exist and target an " "existing kafka installation" % KAFKA_ROOT ) raise ValueError(msg) KAFKA_SCRIPTS = Path(KAFKA_ROOT) / "bin" KAFKA_BIN = str(KAFKA_SCRIPTS / "kafka-server-start.sh") ZOOKEEPER_BIN = str(KAFKA_SCRIPTS / "zookeeper-server-start.sh") ZK_CONFIG_TEMPLATE = ZOOKEEPER_CONFIG_TEMPLATE + "\nadmin.enableServer=false\n" KAFKA_CONFIG_TEMPLATE = KAFKA_SERVER_CONFIG_TEMPLATE + "\nmessage.max.bytes=104857600\n" # Those defines fixtures zookeeper_proc = make_zookeeper_process( ZOOKEEPER_BIN, zk_config_template=ZK_CONFIG_TEMPLATE, scope="session" ) os.environ[ "KAFKA_LOG4J_OPTS" ] = "-Dlog4j.configuration=file:%s/log4j.properties" % os.path.dirname(__file__) session_kafka_server = make_kafka_server( KAFKA_BIN, "zookeeper_proc", kafka_config_template=KAFKA_CONFIG_TEMPLATE, scope="session", ) kafka_logger = logging.getLogger("kafka") kafka_logger.setLevel(logging.WARN) @pytest.fixture(scope="function") def kafka_prefix(): """Pick a random prefix for kafka topics on each call""" return "".join(random.choice(string.ascii_lowercase) for _ in range(10)) @pytest.fixture(scope="function") def kafka_consumer_group(kafka_prefix: str): """Pick a random consumer group for kafka consumers on each call""" return "test-consumer-%s" % kafka_prefix @pytest.fixture(scope="session") def kafka_admin_client(session_kafka_server: Tuple[Popen, int]) -> AdminClient: return AdminClient({"bootstrap.servers": "localhost:%s" % session_kafka_server[1]}) @pytest.fixture(scope="function") def kafka_server_config_overrides() -> Dict[str, str]: return {} @pytest.fixture(scope="function") def kafka_server( session_kafka_server: Tuple[Popen, int], kafka_admin_client: AdminClient, kafka_server_config_overrides: Dict[str, str], ) -> Iterator[Tuple[Popen, int]]: # No overrides, we can just return the original broker connection if not kafka_server_config_overrides: yield session_kafka_server return # This is the minimal operation that the kafka_admin_client gives to # retrieve the cluster metadata, which we need to get the numeric id of the # broker spawned by pytest_kafka. metadata = kafka_admin_client.list_topics("__consumer_offsets") broker_ids = [str(broker) for broker in metadata.brokers.keys()] assert len(broker_ids) == 1, "More than one broker found in the kafka cluster?!" # Pull the current broker configuration. describe_configs and alter_configs # generate a dict containing one concurrent.future per queried # ConfigResource, hence the use of .result() broker = ConfigResource("broker", broker_ids[0]) futures = kafka_admin_client.describe_configs([broker]) original_config = futures[broker].result() # Gather the list of settings we need to change in the broker # ConfigResource, and their original values in the to_restore dict to_restore = {} for key, new_value in kafka_server_config_overrides.items(): if key not in original_config: raise ValueError(f"Cannot override unknown configuration {key}") orig_value = original_config[key].value if orig_value == new_value: continue if original_config[key].is_read_only: raise ValueError(f"Cannot override read-only configuration {key}") broker.set_config(key, new_value) to_restore[key] = orig_value # to_restore will be empty if all the config "overrides" are equal to the # original value. No need to wait for a config alteration if that's the # case. The result() will raise a KafkaException if the settings change # failed. if to_restore: futures = kafka_admin_client.alter_configs([broker]) try: futures[broker].result() except Exception: raise yield session_kafka_server # Now we can restore the old setting values. Again, the result() will raise # a KafkaException if the settings change failed. if to_restore: for key, orig_value in to_restore.items(): broker.set_config(key, orig_value) futures = kafka_admin_client.alter_configs([broker]) try: futures[broker].result() except Exception: raise TEST_CONFIG = { "consumer_id": "swh.journal.consumer", "object_types": TEST_OBJECT_DICTS.keys(), "stop_after_objects": 1, # will read 1 object and stop "storage": {"cls": "memory", "args": {}}, } @pytest.fixture def test_config(kafka_server: Tuple[Popen, int], kafka_prefix: str): """Test configuration needed for producer/consumer """ _, port = kafka_server return { **TEST_CONFIG, "brokers": ["127.0.0.1:{}".format(port)], "prefix": kafka_prefix + ".swh.journal.objects", } @pytest.fixture def consumer( kafka_server: Tuple[Popen, int], test_config: Dict, kafka_consumer_group: str, ) -> Consumer: """Get a connected Kafka consumer. """ _, kafka_port = kafka_server consumer = Consumer( { "bootstrap.servers": "127.0.0.1:{}".format(kafka_port), "auto.offset.reset": "earliest", "enable.auto.commit": True, "group.id": kafka_consumer_group, } ) kafka_topics = [ "%s.%s" % (test_config["prefix"], object_type) for object_type in test_config["object_types"] ] consumer.subscribe(kafka_topics) yield consumer consumer.close() def objects_d(): return one_of( strategies.origins().map(lambda x: ("origin", x.to_dict())), strategies.origin_visits().map(lambda x: ("origin_visit", x.to_dict())), strategies.snapshots().map(lambda x: ("snapshot", x.to_dict())), strategies.releases().map(lambda x: ("release", x.to_dict())), strategies.revisions().map(lambda x: ("revision", x.to_dict())), strategies.directories().map(lambda x: ("directory", x.to_dict())), strategies.skipped_contents().map(lambda x: ("skipped_content", x.to_dict())), strategies.present_contents().map(lambda x: ("content", x.to_dict())), ) diff --git a/swh/journal/tests/test_kafka_writer.py b/swh/journal/tests/test_kafka_writer.py index 98ae63c..e931b15 100644 --- a/swh/journal/tests/test_kafka_writer.py +++ b/swh/journal/tests/test_kafka_writer.py @@ -1,148 +1,149 @@ # Copyright (C) 2018-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import defaultdict from confluent_kafka import Consumer, KafkaException from subprocess import Popen from typing import Tuple from swh.storage import get_storage -from swh.journal.serializers import kafka_to_key, kafka_to_value +from swh.journal.serializers import object_key, kafka_to_key, kafka_to_value from swh.journal.writer.kafka import KafkaJournalWriter from swh.model.model import Origin, OriginVisit from .conftest import TEST_OBJECTS, TEST_OBJECT_DICTS def consume_messages(consumer, kafka_prefix, expected_messages): """Consume expected_messages from the consumer; Sort them all into a consumed_objects dict""" consumed_messages = defaultdict(list) fetched_messages = 0 retries_left = 1000 while fetched_messages < expected_messages: if retries_left == 0: raise ValueError("Timed out fetching messages from kafka") msg = consumer.poll(timeout=0.01) if not msg: retries_left -= 1 continue error = msg.error() if error is not None: if error.fatal(): raise KafkaException(error) retries_left -= 1 continue fetched_messages += 1 topic = msg.topic() assert topic.startswith(kafka_prefix + "."), "Unexpected topic" object_type = topic[len(kafka_prefix + ".") :] consumed_messages[object_type].append( (kafka_to_key(msg.key()), kafka_to_value(msg.value())) ) return consumed_messages def assert_all_objects_consumed(consumed_messages): """Check whether all objects from TEST_OBJECT_DICTS have been consumed""" - for (object_type, (key_name, objects)) in TEST_OBJECT_DICTS.items(): - (keys, values) = zip(*consumed_messages[object_type]) - if key_name: - assert list(keys) == [object_[key_name] for object_ in objects] - else: - pass # TODO + for object_type, known_values in TEST_OBJECT_DICTS.items(): + known_keys = [object_key(object_type, obj) for obj in TEST_OBJECTS[object_type]] + + (received_keys, received_values) = zip(*consumed_messages[object_type]) if object_type == "origin_visit": - for value in values: + for value in received_values: del value["visit"] elif object_type == "content": - for value in values: + for value in received_values: del value["ctime"] - for object_ in objects: - assert object_ in values + for key in known_keys: + assert key in received_keys + + for value in known_values: + assert value in received_values def test_kafka_writer( kafka_prefix: str, kafka_server: Tuple[Popen, int], consumer: Consumer ): kafka_prefix += ".swh.journal.objects" writer = KafkaJournalWriter( brokers=[f"localhost:{kafka_server[1]}"], client_id="kafka_writer", prefix=kafka_prefix, ) expected_messages = 0 for object_type, objects in TEST_OBJECTS.items(): writer.write_additions(object_type, objects) expected_messages += len(objects) consumed_messages = consume_messages(consumer, kafka_prefix, expected_messages) assert_all_objects_consumed(consumed_messages) def test_storage_direct_writer( kafka_prefix: str, kafka_server: Tuple[Popen, int], consumer: Consumer ): kafka_prefix += ".swh.journal.objects" writer_config = { "cls": "kafka", "brokers": ["localhost:%d" % kafka_server[1]], "client_id": "kafka_writer", "prefix": kafka_prefix, } storage_config = { "cls": "pipeline", "steps": [{"cls": "memory", "journal_writer": writer_config},], } storage = get_storage(**storage_config) expected_messages = 0 for object_type, objects in TEST_OBJECTS.items(): method = getattr(storage, object_type + "_add") if object_type in ( "content", "directory", "revision", "release", "snapshot", "origin", ): method(objects) expected_messages += len(objects) elif object_type in ("origin_visit",): for obj in objects: assert isinstance(obj, OriginVisit) storage.origin_add_one(Origin(url=obj.origin)) visit = method(obj.origin, date=obj.date, type=obj.type) expected_messages += 1 obj_d = obj.to_dict() for k in ("visit", "origin", "date", "type"): del obj_d[k] storage.origin_visit_update(obj.origin, visit.visit, **obj_d) expected_messages += 1 else: assert False, object_type consumed_messages = consume_messages(consumer, kafka_prefix, expected_messages) assert_all_objects_consumed(consumed_messages) diff --git a/swh/journal/tests/test_replay.py b/swh/journal/tests/test_replay.py index 4a5902a..5925049 100644 --- a/swh/journal/tests/test_replay.py +++ b/swh/journal/tests/test_replay.py @@ -1,418 +1,414 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import functools import logging import random from subprocess import Popen from typing import Dict, List, Tuple import dateutil import pytest from confluent_kafka import Producer from hypothesis import strategies, given, settings from swh.storage import get_storage from swh.journal.client import JournalClient from swh.journal.serializers import key_to_kafka, value_to_kafka from swh.journal.replay import process_replay_objects, is_hash_in_bytearray from swh.model.hashutil import hash_to_hex from swh.model.model import Content from .conftest import TEST_OBJECT_DICTS, DUPLICATE_CONTENTS from .utils import MockedJournalClient, MockedKafkaWriter storage_config = {"cls": "pipeline", "steps": [{"cls": "memory"},]} def make_topic(kafka_prefix: str, object_type: str) -> str: return kafka_prefix + "." + object_type def test_storage_play( kafka_prefix: str, kafka_consumer_group: str, kafka_server: Tuple[Popen, int], caplog, ): """Optimal replayer scenario. This: - writes objects to the topic - replayer consumes objects from the topic and replay them """ (_, port) = kafka_server kafka_prefix += ".swh.journal.objects" storage = get_storage(**storage_config) producer = Producer( { "bootstrap.servers": "localhost:{}".format(port), "client.id": "test producer", "acks": "all", } ) now = datetime.datetime.now(tz=datetime.timezone.utc) # Fill Kafka nb_sent = 0 nb_visits = 0 - for (object_type, (_, objects)) in TEST_OBJECT_DICTS.items(): + for object_type, objects in TEST_OBJECT_DICTS.items(): topic = make_topic(kafka_prefix, object_type) for object_ in objects: key = bytes(random.randint(0, 255) for _ in range(40)) object_ = object_.copy() if object_type == "content": object_["ctime"] = now elif object_type == "origin_visit": nb_visits += 1 object_["visit"] = nb_visits producer.produce( topic=topic, key=key_to_kafka(key), value=value_to_kafka(object_), ) nb_sent += 1 producer.flush() caplog.set_level(logging.ERROR, "swh.journal.replay") # Fill the storage from Kafka replayer = JournalClient( brokers="localhost:%d" % kafka_server[1], group_id=kafka_consumer_group, prefix=kafka_prefix, stop_after_objects=nb_sent, ) worker_fn = functools.partial(process_replay_objects, storage=storage) nb_inserted = 0 while nb_inserted < nb_sent: nb_inserted += replayer.process(worker_fn) assert nb_sent == nb_inserted # Check the objects were actually inserted in the storage - assert TEST_OBJECT_DICTS["revision"][1] == list( - storage.revision_get([rev["id"] for rev in TEST_OBJECT_DICTS["revision"][1]]) + assert TEST_OBJECT_DICTS["revision"] == list( + storage.revision_get([rev["id"] for rev in TEST_OBJECT_DICTS["revision"]]) ) - assert TEST_OBJECT_DICTS["release"][1] == list( - storage.release_get([rel["id"] for rel in TEST_OBJECT_DICTS["release"][1]]) + assert TEST_OBJECT_DICTS["release"] == list( + storage.release_get([rel["id"] for rel in TEST_OBJECT_DICTS["release"]]) ) - origins = list( - storage.origin_get([orig for orig in TEST_OBJECT_DICTS["origin"][1]]) - ) - assert TEST_OBJECT_DICTS["origin"][1] == [{"url": orig["url"]} for orig in origins] + origins = list(storage.origin_get([orig for orig in TEST_OBJECT_DICTS["origin"]])) + assert TEST_OBJECT_DICTS["origin"] == [{"url": orig["url"]} for orig in origins] for origin in origins: origin_url = origin["url"] expected_visits = [ { **visit, "origin": origin_url, "date": dateutil.parser.parse(visit["date"]), } - for visit in TEST_OBJECT_DICTS["origin_visit"][1] + for visit in TEST_OBJECT_DICTS["origin_visit"] if visit["origin"] == origin["url"] ] actual_visits = list(storage.origin_visit_get(origin_url)) for visit in actual_visits: del visit["visit"] # opaque identifier assert expected_visits == actual_visits - input_contents = TEST_OBJECT_DICTS["content"][1] + input_contents = TEST_OBJECT_DICTS["content"] contents = storage.content_get_metadata([cont["sha1"] for cont in input_contents]) assert len(contents) == len(input_contents) assert contents == {cont["sha1"]: [cont] for cont in input_contents} collision = 0 for record in caplog.records: logtext = record.getMessage() if "Colliding contents:" in logtext: collision += 1 assert collision == 0, "No collision should be detected" def test_storage_play_with_collision( kafka_prefix: str, kafka_consumer_group: str, kafka_server: Tuple[Popen, int], caplog, ): """Another replayer scenario with collisions. This: - writes objects to the topic, including colliding contents - replayer consumes objects from the topic and replay them - This drops the colliding contents from the replay when detected """ (_, port) = kafka_server kafka_prefix += ".swh.journal.objects" storage = get_storage(**storage_config) producer = Producer( { "bootstrap.servers": "localhost:{}".format(port), "client.id": "test producer", "enable.idempotence": "true", } ) now = datetime.datetime.now(tz=datetime.timezone.utc) # Fill Kafka nb_sent = 0 nb_visits = 0 - for (object_type, (_, objects)) in TEST_OBJECT_DICTS.items(): + for object_type, objects in TEST_OBJECT_DICTS.items(): topic = make_topic(kafka_prefix, object_type) for object_ in objects: key = bytes(random.randint(0, 255) for _ in range(40)) object_ = object_.copy() if object_type == "content": object_["ctime"] = now elif object_type == "origin_visit": nb_visits += 1 object_["visit"] = nb_visits producer.produce( topic=topic, key=key_to_kafka(key), value=value_to_kafka(object_), ) nb_sent += 1 # Create collision in input data # They are not written in the destination for content in DUPLICATE_CONTENTS: topic = make_topic(kafka_prefix, "content") producer.produce( topic=topic, key=key_to_kafka(key), value=value_to_kafka(content), ) nb_sent += 1 producer.flush() caplog.set_level(logging.ERROR, "swh.journal.replay") # Fill the storage from Kafka replayer = JournalClient( brokers="localhost:%d" % kafka_server[1], group_id=kafka_consumer_group, prefix=kafka_prefix, stop_after_objects=nb_sent, ) worker_fn = functools.partial(process_replay_objects, storage=storage) nb_inserted = 0 while nb_inserted < nb_sent: nb_inserted += replayer.process(worker_fn) assert nb_sent == nb_inserted # Check the objects were actually inserted in the storage - assert TEST_OBJECT_DICTS["revision"][1] == list( - storage.revision_get([rev["id"] for rev in TEST_OBJECT_DICTS["revision"][1]]) + assert TEST_OBJECT_DICTS["revision"] == list( + storage.revision_get([rev["id"] for rev in TEST_OBJECT_DICTS["revision"]]) ) - assert TEST_OBJECT_DICTS["release"][1] == list( - storage.release_get([rel["id"] for rel in TEST_OBJECT_DICTS["release"][1]]) + assert TEST_OBJECT_DICTS["release"] == list( + storage.release_get([rel["id"] for rel in TEST_OBJECT_DICTS["release"]]) ) - origins = list( - storage.origin_get([orig for orig in TEST_OBJECT_DICTS["origin"][1]]) - ) - assert TEST_OBJECT_DICTS["origin"][1] == [{"url": orig["url"]} for orig in origins] + origins = list(storage.origin_get([orig for orig in TEST_OBJECT_DICTS["origin"]])) + assert TEST_OBJECT_DICTS["origin"] == [{"url": orig["url"]} for orig in origins] for origin in origins: origin_url = origin["url"] expected_visits = [ { **visit, "origin": origin_url, "date": dateutil.parser.parse(visit["date"]), } - for visit in TEST_OBJECT_DICTS["origin_visit"][1] + for visit in TEST_OBJECT_DICTS["origin_visit"] if visit["origin"] == origin["url"] ] actual_visits = list(storage.origin_visit_get(origin_url)) for visit in actual_visits: del visit["visit"] # opaque identifier assert expected_visits == actual_visits - input_contents = TEST_OBJECT_DICTS["content"][1] + input_contents = TEST_OBJECT_DICTS["content"] contents = storage.content_get_metadata([cont["sha1"] for cont in input_contents]) assert len(contents) == len(input_contents) assert contents == {cont["sha1"]: [cont] for cont in input_contents} nb_collisions = 0 actual_collision: Dict for record in caplog.records: logtext = record.getMessage() if "Collision detected:" in logtext: nb_collisions += 1 actual_collision = record.args["collision"] assert nb_collisions == 1, "1 collision should be detected" algo = "sha1" assert actual_collision["algo"] == algo expected_colliding_hash = hash_to_hex(DUPLICATE_CONTENTS[0][algo]) assert actual_collision["hash"] == expected_colliding_hash actual_colliding_hashes = actual_collision["objects"] assert len(actual_colliding_hashes) == len(DUPLICATE_CONTENTS) for content in DUPLICATE_CONTENTS: expected_content_hashes = { k: hash_to_hex(v) for k, v in Content.from_dict(content).hashes().items() } assert expected_content_hashes in actual_colliding_hashes def _test_write_replay_origin_visit(visits: List[Dict]): """Helper function to write tests for origin_visit. Each visit (a dict) given in the 'visits' argument will be sent to a (mocked) kafka queue, which a in-memory-storage backed replayer is listening to. Check that corresponding origin visits entities are present in the storage and have correct values if they are not skipped. """ queue: List = [] replayer = MockedJournalClient(queue) writer = MockedKafkaWriter(queue) # Note that flipping the order of these two insertions will crash # the test, because the legacy origin_format does not allow to create # the origin when needed (type is missing) writer.send( "origin", "foo", { "url": "http://example.com/", "type": "git", # test the legacy origin format is accepted }, ) for visit in visits: writer.send("origin_visit", "foo", visit) queue_size = len(queue) assert replayer.stop_after_objects is None replayer.stop_after_objects = queue_size storage = get_storage(**storage_config) worker_fn = functools.partial(process_replay_objects, storage=storage) replayer.process(worker_fn) actual_visits = list(storage.origin_visit_get("http://example.com/")) assert len(actual_visits) == len(visits), actual_visits for vin, vout in zip(visits, actual_visits): vin = vin.copy() vout = vout.copy() assert vout.pop("origin") == "http://example.com/" vin.pop("origin") vin.setdefault("type", "git") vin.setdefault("metadata", None) assert vin == vout def test_write_replay_origin_visit(): """Test origin_visit when the 'origin' is just a string.""" now = datetime.datetime.now() visits = [ { "visit": 1, "origin": "http://example.com/", "date": now, "type": "git", "status": "partial", "snapshot": None, } ] _test_write_replay_origin_visit(visits) def test_write_replay_legacy_origin_visit1(): """Origin_visit with no types should make the replayer crash We expect the journal's origin_visit topic to no longer reference such visits. If it does, the replayer must crash so we can fix the journal's topic. """ now = datetime.datetime.now() visit = { "visit": 1, "origin": "http://example.com/", "date": now, "status": "partial", "snapshot": None, } now2 = datetime.datetime.now() visit2 = { "visit": 2, "origin": {"url": "http://example.com/"}, "date": now2, "status": "partial", "snapshot": None, } for origin_visit in [visit, visit2]: with pytest.raises(ValueError, match="Old origin visit format"): _test_write_replay_origin_visit([origin_visit]) def test_write_replay_legacy_origin_visit2(): """Test origin_visit when 'type' is missing from the visit, but not from the origin.""" now = datetime.datetime.now() visits = [ { "visit": 1, "origin": {"url": "http://example.com/", "type": "git",}, "date": now, "type": "git", "status": "partial", "snapshot": None, } ] _test_write_replay_origin_visit(visits) def test_write_replay_legacy_origin_visit3(): """Test origin_visit when the origin is a dict""" now = datetime.datetime.now() visits = [ { "visit": 1, "origin": {"url": "http://example.com/",}, "date": now, "type": "git", "status": "partial", "snapshot": None, } ] _test_write_replay_origin_visit(visits) hash_strategy = strategies.binary(min_size=20, max_size=20) @settings(max_examples=500) @given( strategies.sets(hash_strategy, min_size=0, max_size=500), strategies.sets(hash_strategy, min_size=10), ) def test_is_hash_in_bytearray(haystack, needles): array = b"".join(sorted(haystack)) needles |= haystack # Exhaustively test for all objects in the array for needle in needles: assert is_hash_in_bytearray(needle, array, len(haystack)) == ( needle in haystack )