Page MenuHomeSoftware Heritage

D3043.id10824.diff
No OneTemporary

D3043.id10824.diff

diff --git a/setup.py b/setup.py
--- a/setup.py
+++ b/setup.py
@@ -51,6 +51,8 @@
swh-journal=swh.journal.cli:main
[swh.cli.subcommands]
journal=swh.journal.cli:cli
+ [pytest11]
+ pytest_swh_journal = swh.journal.pytest_plugin
""",
install_requires=parse_requirements() + parse_requirements("swh"),
setup_requires=["vcversioner"],
diff --git a/swh/journal/pytest_plugin.py b/swh/journal/pytest_plugin.py
new file mode 100644
--- /dev/null
+++ b/swh/journal/pytest_plugin.py
@@ -0,0 +1,153 @@
+# Copyright (C) 2019-2020 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import random
+import string
+
+from typing import Dict, Iterator
+from collections import defaultdict
+
+import pytest
+
+from confluent_kafka import Consumer, Producer, KafkaException
+
+from swh.journal.serializers import object_key, kafka_to_key, kafka_to_value
+from swh.journal.tests.journal_data import TEST_OBJECTS, TEST_OBJECT_DICTS
+
+
+def consume_messages(consumer, kafka_prefix, expected_messages):
+ """Consume expected_messages from the consumer;
+ Sort them all into a consumed_objects dict"""
+ consumed_messages = defaultdict(list)
+
+ fetched_messages = 0
+ retries_left = 1000
+
+ while fetched_messages < expected_messages:
+ if retries_left == 0:
+ raise ValueError("Timed out fetching messages from kafka")
+
+ msg = consumer.poll(timeout=0.01)
+
+ if not msg:
+ retries_left -= 1
+ continue
+
+ error = msg.error()
+ if error is not None:
+ if error.fatal():
+ raise KafkaException(error)
+ retries_left -= 1
+ continue
+
+ fetched_messages += 1
+ topic = msg.topic()
+ assert topic.startswith(kafka_prefix + "."), "Unexpected topic"
+ object_type = topic[len(kafka_prefix + ".") :]
+
+ consumed_messages[object_type].append(
+ (kafka_to_key(msg.key()), kafka_to_value(msg.value()))
+ )
+
+ return consumed_messages
+
+
+def assert_all_objects_consumed(consumed_messages):
+ """Check whether all objects from TEST_OBJECT_DICTS have been consumed"""
+ for object_type, known_values in TEST_OBJECT_DICTS.items():
+ known_keys = [object_key(object_type, obj) for obj in TEST_OBJECTS[object_type]]
+
+ (received_keys, received_values) = zip(*consumed_messages[object_type])
+
+ if object_type == "origin_visit":
+ for value in received_values:
+ del value["visit"]
+ elif object_type == "content":
+ for value in received_values:
+ del value["ctime"]
+
+ for key in known_keys:
+ assert key in received_keys
+
+ for value in known_values:
+ assert value in received_values
+
+
+@pytest.fixture(scope="function")
+def kafka_prefix():
+ """Pick a random prefix for kafka topics on each call"""
+ return "".join(random.choice(string.ascii_lowercase) for _ in range(10))
+
+
+@pytest.fixture(scope="function")
+def kafka_consumer_group(kafka_prefix: str):
+ """Pick a random consumer group for kafka consumers on each call"""
+ return "test-consumer-%s" % kafka_prefix
+
+
+@pytest.fixture(scope="session")
+def kafka_server() -> Iterator[str]:
+ p = Producer({"test.mock.num.brokers": "1"})
+
+ metadata = p.list_topics()
+ brokers = [str(broker) for broker in metadata.brokers.values()]
+ assert len(brokers) == 1, "More than one broker found in the kafka cluster?!"
+
+ broker_connstr, broker_id = brokers[0].split("/")
+ ip, port_str = broker_connstr.split(":")
+ assert ip == "127.0.0.1"
+ assert int(port_str)
+
+ yield broker_connstr
+
+ p.flush()
+
+
+TEST_CONFIG = {
+ "consumer_id": "swh.journal.consumer",
+ "object_types": TEST_OBJECT_DICTS.keys(),
+ "stop_after_objects": 1, # will read 1 object and stop
+ "storage": {"cls": "memory", "args": {}},
+}
+
+
+@pytest.fixture
+def test_config(kafka_server: str, kafka_prefix: str):
+ """Test configuration needed for producer/consumer
+
+ """
+ return {
+ **TEST_CONFIG,
+ "brokers": [kafka_server],
+ "prefix": kafka_prefix + ".swh.journal.objects",
+ }
+
+
+@pytest.fixture
+def consumer(
+ kafka_server: str, test_config: Dict, kafka_consumer_group: str,
+) -> Consumer:
+ """Get a connected Kafka consumer.
+
+ """
+ consumer = Consumer(
+ {
+ "bootstrap.servers": kafka_server,
+ "auto.offset.reset": "earliest",
+ "enable.auto.commit": True,
+ "group.id": kafka_consumer_group,
+ }
+ )
+
+ kafka_topics = [
+ "%s.%s" % (test_config["prefix"], object_type)
+ for object_type in test_config["object_types"]
+ ]
+
+ consumer.subscribe(kafka_topics)
+
+ yield consumer
+
+ consumer.close()
diff --git a/swh/journal/tests/conftest.py b/swh/journal/tests/conftest.py
--- a/swh/journal/tests/conftest.py
+++ b/swh/journal/tests/conftest.py
@@ -1,243 +1,19 @@
-# Copyright (C) 2019 The Software Heritage developers
+# Copyright (C) 2019-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-import datetime
-import pytest
import logging
-import random
-import string
-
-from confluent_kafka import Consumer, Producer
from hypothesis.strategies import one_of
-from typing import Any, Dict, Iterator, List
from swh.model import hypothesis_strategies as strategies
-from swh.model.hashutil import MultiHash, hash_to_bytes
-
-from swh.journal.serializers import ModelObject
-from swh.journal.writer.kafka import OBJECT_TYPES
+# for bw compat
+from swh.journal.tests.journal_data import * # noqa
logger = logging.getLogger(__name__)
-CONTENTS = [
- {**MultiHash.from_data(b"foo").digest(), "length": 3, "status": "visible",},
-]
-
-duplicate_content1 = {
- "length": 4,
- "sha1": hash_to_bytes("44973274ccef6ab4dfaaf86599792fa9c3fe4689"),
- "sha1_git": b"another-foo",
- "blake2s256": b"another-bar",
- "sha256": b"another-baz",
- "status": "visible",
-}
-
-# Craft a sha1 collision
-duplicate_content2 = duplicate_content1.copy()
-sha1_array = bytearray(duplicate_content1["sha1_git"])
-sha1_array[0] += 1
-duplicate_content2["sha1_git"] = bytes(sha1_array)
-
-
-DUPLICATE_CONTENTS = [duplicate_content1, duplicate_content2]
-
-
-COMMITTERS = [
- {"fullname": b"foo", "name": b"foo", "email": b"",},
- {"fullname": b"bar", "name": b"bar", "email": b"",},
-]
-
-DATES = [
- {
- "timestamp": {"seconds": 1234567891, "microseconds": 0,},
- "offset": 120,
- "negative_utc": False,
- },
- {
- "timestamp": {"seconds": 1234567892, "microseconds": 0,},
- "offset": 120,
- "negative_utc": False,
- },
-]
-
-REVISIONS = [
- {
- "id": hash_to_bytes("7026b7c1a2af56521e951c01ed20f255fa054238"),
- "message": b"hello",
- "date": DATES[0],
- "committer": COMMITTERS[0],
- "author": COMMITTERS[0],
- "committer_date": DATES[0],
- "type": "git",
- "directory": b"\x01" * 20,
- "synthetic": False,
- "metadata": None,
- "parents": [],
- },
- {
- "id": hash_to_bytes("368a48fe15b7db2383775f97c6b247011b3f14f4"),
- "message": b"hello again",
- "date": DATES[1],
- "committer": COMMITTERS[1],
- "author": COMMITTERS[1],
- "committer_date": DATES[1],
- "type": "hg",
- "directory": b"\x02" * 20,
- "synthetic": False,
- "metadata": None,
- "parents": [],
- },
-]
-
-RELEASES = [
- {
- "id": hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"),
- "name": b"v0.0.1",
- "date": {
- "timestamp": {"seconds": 1234567890, "microseconds": 0,},
- "offset": 120,
- "negative_utc": False,
- },
- "author": COMMITTERS[0],
- "target_type": "revision",
- "target": b"\x04" * 20,
- "message": b"foo",
- "synthetic": False,
- },
-]
-
-ORIGINS = [
- {"url": "https://somewhere.org/den/fox",},
- {"url": "https://overtherainbow.org/fox/den",},
-]
-
-ORIGIN_VISITS = [
- {
- "origin": ORIGINS[0]["url"],
- "date": "2013-05-07 04:20:39.369271+00:00",
- "snapshot": None, # TODO
- "status": "ongoing", # TODO
- "metadata": {"foo": "bar"},
- "type": "git",
- },
- {
- "origin": ORIGINS[0]["url"],
- "date": "2018-11-27 17:20:39+00:00",
- "snapshot": None, # TODO
- "status": "ongoing", # TODO
- "metadata": {"baz": "qux"},
- "type": "git",
- },
-]
-
-TEST_OBJECT_DICTS: Dict[str, List[Dict[str, Any]]] = {
- "content": CONTENTS,
- "revision": REVISIONS,
- "release": RELEASES,
- "origin": ORIGINS,
- "origin_visit": ORIGIN_VISITS,
-}
-
-MODEL_OBJECTS = {v: k for (k, v) in OBJECT_TYPES.items()}
-
-TEST_OBJECTS: Dict[str, List[ModelObject]] = {}
-
-for object_type, objects in TEST_OBJECT_DICTS.items():
- converted_objects: List[ModelObject] = []
- model = MODEL_OBJECTS[object_type]
-
- for (num, obj_d) in enumerate(objects):
- if object_type == "origin_visit":
- obj_d = {**obj_d, "visit": num}
- elif object_type == "content":
- obj_d = {**obj_d, "data": b"", "ctime": datetime.datetime.now()}
-
- converted_objects.append(model.from_dict(obj_d))
-
- TEST_OBJECTS[object_type] = converted_objects
-
-
-@pytest.fixture(scope="function")
-def kafka_prefix():
- """Pick a random prefix for kafka topics on each call"""
- return "".join(random.choice(string.ascii_lowercase) for _ in range(10))
-
-
-@pytest.fixture(scope="function")
-def kafka_consumer_group(kafka_prefix: str):
- """Pick a random consumer group for kafka consumers on each call"""
- return "test-consumer-%s" % kafka_prefix
-
-
-@pytest.fixture(scope="session")
-def kafka_server() -> Iterator[str]:
- p = Producer({"test.mock.num.brokers": "1"})
-
- metadata = p.list_topics()
- brokers = [str(broker) for broker in metadata.brokers.values()]
- assert len(brokers) == 1, "More than one broker found in the kafka cluster?!"
-
- broker_connstr, broker_id = brokers[0].split("/")
- ip, port_str = broker_connstr.split(":")
- assert ip == "127.0.0.1"
- assert int(port_str)
-
- yield broker_connstr
-
- p.flush()
-
-
-TEST_CONFIG = {
- "consumer_id": "swh.journal.consumer",
- "object_types": TEST_OBJECT_DICTS.keys(),
- "stop_after_objects": 1, # will read 1 object and stop
- "storage": {"cls": "memory", "args": {}},
-}
-
-
-@pytest.fixture
-def test_config(kafka_server: str, kafka_prefix: str):
- """Test configuration needed for producer/consumer
-
- """
- return {
- **TEST_CONFIG,
- "brokers": [kafka_server],
- "prefix": kafka_prefix + ".swh.journal.objects",
- }
-
-
-@pytest.fixture
-def consumer(
- kafka_server: str, test_config: Dict, kafka_consumer_group: str,
-) -> Consumer:
- """Get a connected Kafka consumer.
-
- """
- consumer = Consumer(
- {
- "bootstrap.servers": kafka_server,
- "auto.offset.reset": "earliest",
- "enable.auto.commit": True,
- "group.id": kafka_consumer_group,
- }
- )
-
- kafka_topics = [
- "%s.%s" % (test_config["prefix"], object_type)
- for object_type in test_config["object_types"]
- ]
-
- consumer.subscribe(kafka_topics)
-
- yield consumer
-
- consumer.close()
-
def objects_d():
return one_of(
diff --git a/swh/journal/tests/conftest.py b/swh/journal/tests/journal_data.py
copy from swh/journal/tests/conftest.py
copy to swh/journal/tests/journal_data.py
--- a/swh/journal/tests/conftest.py
+++ b/swh/journal/tests/journal_data.py
@@ -1,28 +1,17 @@
-# Copyright (C) 2019 The Software Heritage developers
+# Copyright (C) 2019-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import datetime
-import pytest
-import logging
-import random
-import string
-from confluent_kafka import Consumer, Producer
+from typing import Any, Dict, List
-from hypothesis.strategies import one_of
-from typing import Any, Dict, Iterator, List
-
-from swh.model import hypothesis_strategies as strategies
from swh.model.hashutil import MultiHash, hash_to_bytes
-
from swh.journal.serializers import ModelObject
from swh.journal.writer.kafka import OBJECT_TYPES
-logger = logging.getLogger(__name__)
-
CONTENTS = [
{**MultiHash.from_data(b"foo").digest(), "length": 3, "status": "visible",},
]
@@ -159,94 +148,3 @@
converted_objects.append(model.from_dict(obj_d))
TEST_OBJECTS[object_type] = converted_objects
-
-
-@pytest.fixture(scope="function")
-def kafka_prefix():
- """Pick a random prefix for kafka topics on each call"""
- return "".join(random.choice(string.ascii_lowercase) for _ in range(10))
-
-
-@pytest.fixture(scope="function")
-def kafka_consumer_group(kafka_prefix: str):
- """Pick a random consumer group for kafka consumers on each call"""
- return "test-consumer-%s" % kafka_prefix
-
-
-@pytest.fixture(scope="session")
-def kafka_server() -> Iterator[str]:
- p = Producer({"test.mock.num.brokers": "1"})
-
- metadata = p.list_topics()
- brokers = [str(broker) for broker in metadata.brokers.values()]
- assert len(brokers) == 1, "More than one broker found in the kafka cluster?!"
-
- broker_connstr, broker_id = brokers[0].split("/")
- ip, port_str = broker_connstr.split(":")
- assert ip == "127.0.0.1"
- assert int(port_str)
-
- yield broker_connstr
-
- p.flush()
-
-
-TEST_CONFIG = {
- "consumer_id": "swh.journal.consumer",
- "object_types": TEST_OBJECT_DICTS.keys(),
- "stop_after_objects": 1, # will read 1 object and stop
- "storage": {"cls": "memory", "args": {}},
-}
-
-
-@pytest.fixture
-def test_config(kafka_server: str, kafka_prefix: str):
- """Test configuration needed for producer/consumer
-
- """
- return {
- **TEST_CONFIG,
- "brokers": [kafka_server],
- "prefix": kafka_prefix + ".swh.journal.objects",
- }
-
-
-@pytest.fixture
-def consumer(
- kafka_server: str, test_config: Dict, kafka_consumer_group: str,
-) -> Consumer:
- """Get a connected Kafka consumer.
-
- """
- consumer = Consumer(
- {
- "bootstrap.servers": kafka_server,
- "auto.offset.reset": "earliest",
- "enable.auto.commit": True,
- "group.id": kafka_consumer_group,
- }
- )
-
- kafka_topics = [
- "%s.%s" % (test_config["prefix"], object_type)
- for object_type in test_config["object_types"]
- ]
-
- consumer.subscribe(kafka_topics)
-
- yield consumer
-
- consumer.close()
-
-
-def objects_d():
- return one_of(
- strategies.origins().map(lambda x: ("origin", x.to_dict())),
- strategies.origin_visits().map(lambda x: ("origin_visit", x.to_dict())),
- strategies.snapshots().map(lambda x: ("snapshot", x.to_dict())),
- strategies.releases().map(lambda x: ("release", x.to_dict())),
- strategies.revisions().map(lambda x: ("revision", x.to_dict())),
- strategies.directories().map(lambda x: ("directory", x.to_dict())),
- strategies.skipped_contents().map(lambda x: ("skipped_content", x.to_dict())),
- strategies.present_contents().map(lambda x: ("content", x.to_dict())),
- )
diff --git a/swh/journal/tests/test_kafka_writer.py b/swh/journal/tests/test_kafka_writer.py
--- a/swh/journal/tests/test_kafka_writer.py
+++ b/swh/journal/tests/test_kafka_writer.py
@@ -3,77 +3,16 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-from collections import defaultdict
-
import pytest
-from confluent_kafka import Consumer, Producer, KafkaException
+from confluent_kafka import Consumer, Producer
from swh.storage import get_storage
-from swh.journal.serializers import object_key, kafka_to_key, kafka_to_value
-from swh.journal.writer.kafka import KafkaJournalWriter, KafkaDeliveryError
-
from swh.model.model import Directory, Origin, OriginVisit
-from .conftest import TEST_OBJECTS, TEST_OBJECT_DICTS
-
-
-def consume_messages(consumer, kafka_prefix, expected_messages):
- """Consume expected_messages from the consumer;
- Sort them all into a consumed_objects dict"""
- consumed_messages = defaultdict(list)
-
- fetched_messages = 0
- retries_left = 1000
-
- while fetched_messages < expected_messages:
- if retries_left == 0:
- raise ValueError("Timed out fetching messages from kafka")
-
- msg = consumer.poll(timeout=0.01)
-
- if not msg:
- retries_left -= 1
- continue
-
- error = msg.error()
- if error is not None:
- if error.fatal():
- raise KafkaException(error)
- retries_left -= 1
- continue
-
- fetched_messages += 1
- topic = msg.topic()
- assert topic.startswith(kafka_prefix + "."), "Unexpected topic"
- object_type = topic[len(kafka_prefix + ".") :]
-
- consumed_messages[object_type].append(
- (kafka_to_key(msg.key()), kafka_to_value(msg.value()))
- )
-
- return consumed_messages
-
-
-def assert_all_objects_consumed(consumed_messages):
- """Check whether all objects from TEST_OBJECT_DICTS have been consumed"""
- for object_type, known_values in TEST_OBJECT_DICTS.items():
- known_keys = [object_key(object_type, obj) for obj in TEST_OBJECTS[object_type]]
-
- (received_keys, received_values) = zip(*consumed_messages[object_type])
-
- if object_type == "origin_visit":
- for value in received_values:
- del value["visit"]
- elif object_type == "content":
- for value in received_values:
- del value["ctime"]
-
- for key in known_keys:
- assert key in received_keys
-
- for value in known_values:
- assert value in received_values
+from swh.journal.tests.journal_data import TEST_OBJECTS
+from swh.journal.pytest_plugin import consume_messages, assert_all_objects_consumed
+from swh.journal.writer.kafka import KafkaJournalWriter, KafkaDeliveryError
def test_kafka_writer(kafka_prefix: str, kafka_server: str, consumer: Consumer):

File Metadata

Mime Type
text/plain
Expires
Thu, Jan 30, 10:08 AM (19 h, 8 m ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3214266

Event Timeline