Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F7163547
D3043.id10824.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
18 KB
Subscribers
None
D3043.id10824.diff
View Options
diff --git a/setup.py b/setup.py
--- a/setup.py
+++ b/setup.py
@@ -51,6 +51,8 @@
swh-journal=swh.journal.cli:main
[swh.cli.subcommands]
journal=swh.journal.cli:cli
+ [pytest11]
+ pytest_swh_journal = swh.journal.pytest_plugin
""",
install_requires=parse_requirements() + parse_requirements("swh"),
setup_requires=["vcversioner"],
diff --git a/swh/journal/pytest_plugin.py b/swh/journal/pytest_plugin.py
new file mode 100644
--- /dev/null
+++ b/swh/journal/pytest_plugin.py
@@ -0,0 +1,153 @@
+# Copyright (C) 2019-2020 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import random
+import string
+
+from typing import Dict, Iterator
+from collections import defaultdict
+
+import pytest
+
+from confluent_kafka import Consumer, Producer, KafkaException
+
+from swh.journal.serializers import object_key, kafka_to_key, kafka_to_value
+from swh.journal.tests.journal_data import TEST_OBJECTS, TEST_OBJECT_DICTS
+
+
+def consume_messages(consumer, kafka_prefix, expected_messages):
+ """Consume expected_messages from the consumer;
+ Sort them all into a consumed_objects dict"""
+ consumed_messages = defaultdict(list)
+
+ fetched_messages = 0
+ retries_left = 1000
+
+ while fetched_messages < expected_messages:
+ if retries_left == 0:
+ raise ValueError("Timed out fetching messages from kafka")
+
+ msg = consumer.poll(timeout=0.01)
+
+ if not msg:
+ retries_left -= 1
+ continue
+
+ error = msg.error()
+ if error is not None:
+ if error.fatal():
+ raise KafkaException(error)
+ retries_left -= 1
+ continue
+
+ fetched_messages += 1
+ topic = msg.topic()
+ assert topic.startswith(kafka_prefix + "."), "Unexpected topic"
+ object_type = topic[len(kafka_prefix + ".") :]
+
+ consumed_messages[object_type].append(
+ (kafka_to_key(msg.key()), kafka_to_value(msg.value()))
+ )
+
+ return consumed_messages
+
+
+def assert_all_objects_consumed(consumed_messages):
+ """Check whether all objects from TEST_OBJECT_DICTS have been consumed"""
+ for object_type, known_values in TEST_OBJECT_DICTS.items():
+ known_keys = [object_key(object_type, obj) for obj in TEST_OBJECTS[object_type]]
+
+ (received_keys, received_values) = zip(*consumed_messages[object_type])
+
+ if object_type == "origin_visit":
+ for value in received_values:
+ del value["visit"]
+ elif object_type == "content":
+ for value in received_values:
+ del value["ctime"]
+
+ for key in known_keys:
+ assert key in received_keys
+
+ for value in known_values:
+ assert value in received_values
+
+
+@pytest.fixture(scope="function")
+def kafka_prefix():
+ """Pick a random prefix for kafka topics on each call"""
+ return "".join(random.choice(string.ascii_lowercase) for _ in range(10))
+
+
+@pytest.fixture(scope="function")
+def kafka_consumer_group(kafka_prefix: str):
+ """Pick a random consumer group for kafka consumers on each call"""
+ return "test-consumer-%s" % kafka_prefix
+
+
+@pytest.fixture(scope="session")
+def kafka_server() -> Iterator[str]:
+ p = Producer({"test.mock.num.brokers": "1"})
+
+ metadata = p.list_topics()
+ brokers = [str(broker) for broker in metadata.brokers.values()]
+ assert len(brokers) == 1, "More than one broker found in the kafka cluster?!"
+
+ broker_connstr, broker_id = brokers[0].split("/")
+ ip, port_str = broker_connstr.split(":")
+ assert ip == "127.0.0.1"
+ assert int(port_str)
+
+ yield broker_connstr
+
+ p.flush()
+
+
+TEST_CONFIG = {
+ "consumer_id": "swh.journal.consumer",
+ "object_types": TEST_OBJECT_DICTS.keys(),
+ "stop_after_objects": 1, # will read 1 object and stop
+ "storage": {"cls": "memory", "args": {}},
+}
+
+
+@pytest.fixture
+def test_config(kafka_server: str, kafka_prefix: str):
+ """Test configuration needed for producer/consumer
+
+ """
+ return {
+ **TEST_CONFIG,
+ "brokers": [kafka_server],
+ "prefix": kafka_prefix + ".swh.journal.objects",
+ }
+
+
+@pytest.fixture
+def consumer(
+ kafka_server: str, test_config: Dict, kafka_consumer_group: str,
+) -> Consumer:
+ """Get a connected Kafka consumer.
+
+ """
+ consumer = Consumer(
+ {
+ "bootstrap.servers": kafka_server,
+ "auto.offset.reset": "earliest",
+ "enable.auto.commit": True,
+ "group.id": kafka_consumer_group,
+ }
+ )
+
+ kafka_topics = [
+ "%s.%s" % (test_config["prefix"], object_type)
+ for object_type in test_config["object_types"]
+ ]
+
+ consumer.subscribe(kafka_topics)
+
+ yield consumer
+
+ consumer.close()
diff --git a/swh/journal/tests/conftest.py b/swh/journal/tests/conftest.py
--- a/swh/journal/tests/conftest.py
+++ b/swh/journal/tests/conftest.py
@@ -1,243 +1,19 @@
-# Copyright (C) 2019 The Software Heritage developers
+# Copyright (C) 2019-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-import datetime
-import pytest
import logging
-import random
-import string
-
-from confluent_kafka import Consumer, Producer
from hypothesis.strategies import one_of
-from typing import Any, Dict, Iterator, List
from swh.model import hypothesis_strategies as strategies
-from swh.model.hashutil import MultiHash, hash_to_bytes
-
-from swh.journal.serializers import ModelObject
-from swh.journal.writer.kafka import OBJECT_TYPES
+# for bw compat
+from swh.journal.tests.journal_data import * # noqa
logger = logging.getLogger(__name__)
-CONTENTS = [
- {**MultiHash.from_data(b"foo").digest(), "length": 3, "status": "visible",},
-]
-
-duplicate_content1 = {
- "length": 4,
- "sha1": hash_to_bytes("44973274ccef6ab4dfaaf86599792fa9c3fe4689"),
- "sha1_git": b"another-foo",
- "blake2s256": b"another-bar",
- "sha256": b"another-baz",
- "status": "visible",
-}
-
-# Craft a sha1 collision
-duplicate_content2 = duplicate_content1.copy()
-sha1_array = bytearray(duplicate_content1["sha1_git"])
-sha1_array[0] += 1
-duplicate_content2["sha1_git"] = bytes(sha1_array)
-
-
-DUPLICATE_CONTENTS = [duplicate_content1, duplicate_content2]
-
-
-COMMITTERS = [
- {"fullname": b"foo", "name": b"foo", "email": b"",},
- {"fullname": b"bar", "name": b"bar", "email": b"",},
-]
-
-DATES = [
- {
- "timestamp": {"seconds": 1234567891, "microseconds": 0,},
- "offset": 120,
- "negative_utc": False,
- },
- {
- "timestamp": {"seconds": 1234567892, "microseconds": 0,},
- "offset": 120,
- "negative_utc": False,
- },
-]
-
-REVISIONS = [
- {
- "id": hash_to_bytes("7026b7c1a2af56521e951c01ed20f255fa054238"),
- "message": b"hello",
- "date": DATES[0],
- "committer": COMMITTERS[0],
- "author": COMMITTERS[0],
- "committer_date": DATES[0],
- "type": "git",
- "directory": b"\x01" * 20,
- "synthetic": False,
- "metadata": None,
- "parents": [],
- },
- {
- "id": hash_to_bytes("368a48fe15b7db2383775f97c6b247011b3f14f4"),
- "message": b"hello again",
- "date": DATES[1],
- "committer": COMMITTERS[1],
- "author": COMMITTERS[1],
- "committer_date": DATES[1],
- "type": "hg",
- "directory": b"\x02" * 20,
- "synthetic": False,
- "metadata": None,
- "parents": [],
- },
-]
-
-RELEASES = [
- {
- "id": hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"),
- "name": b"v0.0.1",
- "date": {
- "timestamp": {"seconds": 1234567890, "microseconds": 0,},
- "offset": 120,
- "negative_utc": False,
- },
- "author": COMMITTERS[0],
- "target_type": "revision",
- "target": b"\x04" * 20,
- "message": b"foo",
- "synthetic": False,
- },
-]
-
-ORIGINS = [
- {"url": "https://somewhere.org/den/fox",},
- {"url": "https://overtherainbow.org/fox/den",},
-]
-
-ORIGIN_VISITS = [
- {
- "origin": ORIGINS[0]["url"],
- "date": "2013-05-07 04:20:39.369271+00:00",
- "snapshot": None, # TODO
- "status": "ongoing", # TODO
- "metadata": {"foo": "bar"},
- "type": "git",
- },
- {
- "origin": ORIGINS[0]["url"],
- "date": "2018-11-27 17:20:39+00:00",
- "snapshot": None, # TODO
- "status": "ongoing", # TODO
- "metadata": {"baz": "qux"},
- "type": "git",
- },
-]
-
-TEST_OBJECT_DICTS: Dict[str, List[Dict[str, Any]]] = {
- "content": CONTENTS,
- "revision": REVISIONS,
- "release": RELEASES,
- "origin": ORIGINS,
- "origin_visit": ORIGIN_VISITS,
-}
-
-MODEL_OBJECTS = {v: k for (k, v) in OBJECT_TYPES.items()}
-
-TEST_OBJECTS: Dict[str, List[ModelObject]] = {}
-
-for object_type, objects in TEST_OBJECT_DICTS.items():
- converted_objects: List[ModelObject] = []
- model = MODEL_OBJECTS[object_type]
-
- for (num, obj_d) in enumerate(objects):
- if object_type == "origin_visit":
- obj_d = {**obj_d, "visit": num}
- elif object_type == "content":
- obj_d = {**obj_d, "data": b"", "ctime": datetime.datetime.now()}
-
- converted_objects.append(model.from_dict(obj_d))
-
- TEST_OBJECTS[object_type] = converted_objects
-
-
-@pytest.fixture(scope="function")
-def kafka_prefix():
- """Pick a random prefix for kafka topics on each call"""
- return "".join(random.choice(string.ascii_lowercase) for _ in range(10))
-
-
-@pytest.fixture(scope="function")
-def kafka_consumer_group(kafka_prefix: str):
- """Pick a random consumer group for kafka consumers on each call"""
- return "test-consumer-%s" % kafka_prefix
-
-
-@pytest.fixture(scope="session")
-def kafka_server() -> Iterator[str]:
- p = Producer({"test.mock.num.brokers": "1"})
-
- metadata = p.list_topics()
- brokers = [str(broker) for broker in metadata.brokers.values()]
- assert len(brokers) == 1, "More than one broker found in the kafka cluster?!"
-
- broker_connstr, broker_id = brokers[0].split("/")
- ip, port_str = broker_connstr.split(":")
- assert ip == "127.0.0.1"
- assert int(port_str)
-
- yield broker_connstr
-
- p.flush()
-
-
-TEST_CONFIG = {
- "consumer_id": "swh.journal.consumer",
- "object_types": TEST_OBJECT_DICTS.keys(),
- "stop_after_objects": 1, # will read 1 object and stop
- "storage": {"cls": "memory", "args": {}},
-}
-
-
-@pytest.fixture
-def test_config(kafka_server: str, kafka_prefix: str):
- """Test configuration needed for producer/consumer
-
- """
- return {
- **TEST_CONFIG,
- "brokers": [kafka_server],
- "prefix": kafka_prefix + ".swh.journal.objects",
- }
-
-
-@pytest.fixture
-def consumer(
- kafka_server: str, test_config: Dict, kafka_consumer_group: str,
-) -> Consumer:
- """Get a connected Kafka consumer.
-
- """
- consumer = Consumer(
- {
- "bootstrap.servers": kafka_server,
- "auto.offset.reset": "earliest",
- "enable.auto.commit": True,
- "group.id": kafka_consumer_group,
- }
- )
-
- kafka_topics = [
- "%s.%s" % (test_config["prefix"], object_type)
- for object_type in test_config["object_types"]
- ]
-
- consumer.subscribe(kafka_topics)
-
- yield consumer
-
- consumer.close()
-
def objects_d():
return one_of(
diff --git a/swh/journal/tests/conftest.py b/swh/journal/tests/journal_data.py
copy from swh/journal/tests/conftest.py
copy to swh/journal/tests/journal_data.py
--- a/swh/journal/tests/conftest.py
+++ b/swh/journal/tests/journal_data.py
@@ -1,28 +1,17 @@
-# Copyright (C) 2019 The Software Heritage developers
+# Copyright (C) 2019-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import datetime
-import pytest
-import logging
-import random
-import string
-from confluent_kafka import Consumer, Producer
+from typing import Any, Dict, List
-from hypothesis.strategies import one_of
-from typing import Any, Dict, Iterator, List
-
-from swh.model import hypothesis_strategies as strategies
from swh.model.hashutil import MultiHash, hash_to_bytes
-
from swh.journal.serializers import ModelObject
from swh.journal.writer.kafka import OBJECT_TYPES
-logger = logging.getLogger(__name__)
-
CONTENTS = [
{**MultiHash.from_data(b"foo").digest(), "length": 3, "status": "visible",},
]
@@ -159,94 +148,3 @@
converted_objects.append(model.from_dict(obj_d))
TEST_OBJECTS[object_type] = converted_objects
-
-
-@pytest.fixture(scope="function")
-def kafka_prefix():
- """Pick a random prefix for kafka topics on each call"""
- return "".join(random.choice(string.ascii_lowercase) for _ in range(10))
-
-
-@pytest.fixture(scope="function")
-def kafka_consumer_group(kafka_prefix: str):
- """Pick a random consumer group for kafka consumers on each call"""
- return "test-consumer-%s" % kafka_prefix
-
-
-@pytest.fixture(scope="session")
-def kafka_server() -> Iterator[str]:
- p = Producer({"test.mock.num.brokers": "1"})
-
- metadata = p.list_topics()
- brokers = [str(broker) for broker in metadata.brokers.values()]
- assert len(brokers) == 1, "More than one broker found in the kafka cluster?!"
-
- broker_connstr, broker_id = brokers[0].split("/")
- ip, port_str = broker_connstr.split(":")
- assert ip == "127.0.0.1"
- assert int(port_str)
-
- yield broker_connstr
-
- p.flush()
-
-
-TEST_CONFIG = {
- "consumer_id": "swh.journal.consumer",
- "object_types": TEST_OBJECT_DICTS.keys(),
- "stop_after_objects": 1, # will read 1 object and stop
- "storage": {"cls": "memory", "args": {}},
-}
-
-
-@pytest.fixture
-def test_config(kafka_server: str, kafka_prefix: str):
- """Test configuration needed for producer/consumer
-
- """
- return {
- **TEST_CONFIG,
- "brokers": [kafka_server],
- "prefix": kafka_prefix + ".swh.journal.objects",
- }
-
-
-@pytest.fixture
-def consumer(
- kafka_server: str, test_config: Dict, kafka_consumer_group: str,
-) -> Consumer:
- """Get a connected Kafka consumer.
-
- """
- consumer = Consumer(
- {
- "bootstrap.servers": kafka_server,
- "auto.offset.reset": "earliest",
- "enable.auto.commit": True,
- "group.id": kafka_consumer_group,
- }
- )
-
- kafka_topics = [
- "%s.%s" % (test_config["prefix"], object_type)
- for object_type in test_config["object_types"]
- ]
-
- consumer.subscribe(kafka_topics)
-
- yield consumer
-
- consumer.close()
-
-
-def objects_d():
- return one_of(
- strategies.origins().map(lambda x: ("origin", x.to_dict())),
- strategies.origin_visits().map(lambda x: ("origin_visit", x.to_dict())),
- strategies.snapshots().map(lambda x: ("snapshot", x.to_dict())),
- strategies.releases().map(lambda x: ("release", x.to_dict())),
- strategies.revisions().map(lambda x: ("revision", x.to_dict())),
- strategies.directories().map(lambda x: ("directory", x.to_dict())),
- strategies.skipped_contents().map(lambda x: ("skipped_content", x.to_dict())),
- strategies.present_contents().map(lambda x: ("content", x.to_dict())),
- )
diff --git a/swh/journal/tests/test_kafka_writer.py b/swh/journal/tests/test_kafka_writer.py
--- a/swh/journal/tests/test_kafka_writer.py
+++ b/swh/journal/tests/test_kafka_writer.py
@@ -3,77 +3,16 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-from collections import defaultdict
-
import pytest
-from confluent_kafka import Consumer, Producer, KafkaException
+from confluent_kafka import Consumer, Producer
from swh.storage import get_storage
-from swh.journal.serializers import object_key, kafka_to_key, kafka_to_value
-from swh.journal.writer.kafka import KafkaJournalWriter, KafkaDeliveryError
-
from swh.model.model import Directory, Origin, OriginVisit
-from .conftest import TEST_OBJECTS, TEST_OBJECT_DICTS
-
-
-def consume_messages(consumer, kafka_prefix, expected_messages):
- """Consume expected_messages from the consumer;
- Sort them all into a consumed_objects dict"""
- consumed_messages = defaultdict(list)
-
- fetched_messages = 0
- retries_left = 1000
-
- while fetched_messages < expected_messages:
- if retries_left == 0:
- raise ValueError("Timed out fetching messages from kafka")
-
- msg = consumer.poll(timeout=0.01)
-
- if not msg:
- retries_left -= 1
- continue
-
- error = msg.error()
- if error is not None:
- if error.fatal():
- raise KafkaException(error)
- retries_left -= 1
- continue
-
- fetched_messages += 1
- topic = msg.topic()
- assert topic.startswith(kafka_prefix + "."), "Unexpected topic"
- object_type = topic[len(kafka_prefix + ".") :]
-
- consumed_messages[object_type].append(
- (kafka_to_key(msg.key()), kafka_to_value(msg.value()))
- )
-
- return consumed_messages
-
-
-def assert_all_objects_consumed(consumed_messages):
- """Check whether all objects from TEST_OBJECT_DICTS have been consumed"""
- for object_type, known_values in TEST_OBJECT_DICTS.items():
- known_keys = [object_key(object_type, obj) for obj in TEST_OBJECTS[object_type]]
-
- (received_keys, received_values) = zip(*consumed_messages[object_type])
-
- if object_type == "origin_visit":
- for value in received_values:
- del value["visit"]
- elif object_type == "content":
- for value in received_values:
- del value["ctime"]
-
- for key in known_keys:
- assert key in received_keys
-
- for value in known_values:
- assert value in received_values
+from swh.journal.tests.journal_data import TEST_OBJECTS
+from swh.journal.pytest_plugin import consume_messages, assert_all_objects_consumed
+from swh.journal.writer.kafka import KafkaJournalWriter, KafkaDeliveryError
def test_kafka_writer(kafka_prefix: str, kafka_server: str, consumer: Consumer):
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Thu, Jan 30, 10:08 AM (19 h, 8 m ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3214266
Attached To
D3043: Extract kafka-related pytest fixtures in a pytest plugin module
Event Timeline
Log In to Comment