Page MenuHomeSoftware Heritage

No OneTemporary

diff --git a/setup.py b/setup.py
index eb30b87..7408fd4 100755
--- a/setup.py
+++ b/setup.py
@@ -1,71 +1,71 @@
#!/usr/bin/env python3
# Copyright (C) 2015-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-from setuptools import setup, find_packages
-
-from os import path
from io import open
+from os import path
+
+from setuptools import find_packages, setup
here = path.abspath(path.dirname(__file__))
# Get the long description from the README file
with open(path.join(here, "README.md"), encoding="utf-8") as f:
long_description = f.read()
def parse_requirements(name=None):
if name:
reqf = "requirements-%s.txt" % name
else:
reqf = "requirements.txt"
requirements = []
if not path.exists(reqf):
return requirements
with open(reqf) as f:
for line in f.readlines():
line = line.strip()
if not line or line.startswith("#"):
continue
requirements.append(line)
return requirements
setup(
name="swh.journal",
description="Software Heritage Journal utilities",
long_description=long_description,
long_description_content_type="text/markdown",
python_requires=">=3.7",
author="Software Heritage developers",
author_email="swh-devel@inria.fr",
url="https://forge.softwareheritage.org/diffusion/DJNL/",
packages=find_packages(),
scripts=[],
entry_points="""
[pytest11]
pytest_swh_journal = swh.journal.pytest_plugin
""",
install_requires=parse_requirements() + parse_requirements("swh"),
setup_requires=["setuptools-scm"],
use_scm_version=True,
extras_require={"testing": parse_requirements("test")},
include_package_data=True,
classifiers=[
"Programming Language :: Python :: 3",
"Intended Audience :: Developers",
"License :: OSI Approved :: GNU General Public License v3 (GPLv3)",
"Operating System :: OS Independent",
"Development Status :: 5 - Production/Stable",
],
project_urls={
"Bug Reports": "https://forge.softwareheritage.org/maniphest",
"Funding": "https://www.softwareheritage.org/donate",
"Source": "https://forge.softwareheritage.org/source/swh-journal",
"Documentation": "https://docs.softwareheritage.org/devel/swh-journal/",
},
)
diff --git a/swh/journal/client.py b/swh/journal/client.py
index 68b9a22..704a2fe 100644
--- a/swh/journal/client.py
+++ b/swh/journal/client.py
@@ -1,305 +1,306 @@
# Copyright (C) 2017 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from collections import defaultdict
import logging
import os
import time
from typing import Any, Dict, List, Optional, Set, Tuple, Union
-from confluent_kafka import Consumer, KafkaException, KafkaError
+from confluent_kafka import Consumer, KafkaError, KafkaException
-from .serializers import kafka_to_value
from swh.journal import DEFAULT_PREFIX
+from .serializers import kafka_to_value
+
logger = logging.getLogger(__name__)
rdkafka_logger = logging.getLogger(__name__ + ".rdkafka")
# Only accepted offset reset policy accepted
ACCEPTED_OFFSET_RESET = ["earliest", "latest"]
# Errors that Kafka raises too often and are not useful; therefore they
# we lower their log level to DEBUG instead of INFO.
_SPAMMY_ERRORS = [
KafkaError._NO_OFFSET,
]
def get_journal_client(cls: str, **kwargs: Any):
"""Factory function to instantiate a journal client object.
Currently, only the "kafka" journal client is supported.
"""
if cls == "kafka":
return JournalClient(**kwargs)
raise ValueError("Unknown journal client class `%s`" % cls)
def _error_cb(error):
if error.fatal():
raise KafkaException(error)
if error.code() in _SPAMMY_ERRORS:
logger.debug("Received non-fatal kafka error: %s", error)
else:
logger.info("Received non-fatal kafka error: %s", error)
def _on_commit(error, partitions):
if error is not None:
_error_cb(error)
class JournalClient:
"""A base client for the Software Heritage journal.
The current implementation of the journal uses Apache Kafka
brokers to publish messages under a given topic prefix, with each
object type using a specific topic under that prefix. If the `prefix`
argument is None (default value), it will take the default value
`'swh.journal.objects'`.
Clients subscribe to events specific to each object type as listed in the
`object_types` argument (if unset, defaults to all existing kafka topic under
the prefix).
Clients can be sharded by setting the `group_id` to a common
value across instances. The journal will share the message
throughput across the nodes sharing the same group_id.
Messages are processed by the `worker_fn` callback passed to the `process`
method, in batches of maximum `batch_size` messages (defaults to 200).
If set, the processing stops after processing `stop_after_objects` messages
in total.
`stop_on_eof` stops the processing when the client has reached the end of
each partition in turn.
`auto_offset_reset` sets the behavior of the client when the consumer group
initializes: `'earliest'` (the default) processes all objects since the
inception of the topics; `''`
Any other named argument is passed directly to KafkaConsumer().
"""
def __init__(
self,
brokers: Union[str, List[str]],
group_id: str,
prefix: Optional[str] = None,
object_types: Optional[List[str]] = None,
privileged: bool = False,
stop_after_objects: Optional[int] = None,
batch_size: int = 200,
process_timeout: Optional[float] = None,
auto_offset_reset: str = "earliest",
stop_on_eof: bool = False,
**kwargs,
):
if prefix is None:
prefix = DEFAULT_PREFIX
if auto_offset_reset not in ACCEPTED_OFFSET_RESET:
raise ValueError(
"Option 'auto_offset_reset' only accept %s, not %s"
% (ACCEPTED_OFFSET_RESET, auto_offset_reset)
)
if batch_size <= 0:
raise ValueError("Option 'batch_size' needs to be positive")
self.value_deserializer = kafka_to_value
if isinstance(brokers, str):
brokers = [brokers]
debug_logging = rdkafka_logger.isEnabledFor(logging.DEBUG)
if debug_logging and "debug" not in kwargs:
kwargs["debug"] = "consumer"
# Static group instance id management
group_instance_id = os.environ.get("KAFKA_GROUP_INSTANCE_ID")
if group_instance_id:
kwargs["group.instance.id"] = group_instance_id
if "group.instance.id" in kwargs:
# When doing static consumer group membership, set a higher default
# session timeout. The session timeout is the duration after which
# the broker considers that a consumer has left the consumer group
# for good, and triggers a rebalance. Considering our current
# processing pattern, 10 minutes gives the consumer ample time to
# restart before that happens.
if "session.timeout.ms" not in kwargs:
kwargs["session.timeout.ms"] = 10 * 60 * 1000 # 10 minutes
if "session.timeout.ms" in kwargs:
# When the session timeout is set, rdkafka requires the max poll
# interval to be set to a higher value; the max poll interval is
# rdkafka's way of figuring out whether the client's message
# processing thread has stalled: when the max poll interval lapses
# between two calls to consumer.poll(), rdkafka leaves the consumer
# group and terminates the connection to the brokers.
#
# We default to 1.5 times the session timeout
if "max.poll.interval.ms" not in kwargs:
kwargs["max.poll.interval.ms"] = kwargs["session.timeout.ms"] // 2 * 3
consumer_settings = {
**kwargs,
"bootstrap.servers": ",".join(brokers),
"auto.offset.reset": auto_offset_reset,
"group.id": group_id,
"on_commit": _on_commit,
"error_cb": _error_cb,
"enable.auto.commit": False,
"logger": rdkafka_logger,
}
self.stop_on_eof = stop_on_eof
if self.stop_on_eof:
consumer_settings["enable.partition.eof"] = True
logger.debug("Consumer settings: %s", consumer_settings)
self.consumer = Consumer(consumer_settings)
if privileged:
privileged_prefix = f"{prefix}_privileged"
else: # do not attempt to subscribe to privileged topics
privileged_prefix = f"{prefix}"
existing_topics = [
topic
for topic in self.consumer.list_topics(timeout=10).topics.keys()
if (
topic.startswith(f"{prefix}.")
or topic.startswith(f"{privileged_prefix}.")
)
]
if not existing_topics:
raise ValueError(
f"The prefix {prefix} does not match any existing topic "
"on the kafka broker"
)
if not object_types:
object_types = list({topic.split(".")[-1] for topic in existing_topics})
self.subscription = []
unknown_types = []
for object_type in object_types:
topics = (f"{privileged_prefix}.{object_type}", f"{prefix}.{object_type}")
for topic in topics:
if topic in existing_topics:
self.subscription.append(topic)
break
else:
unknown_types.append(object_type)
if unknown_types:
raise ValueError(
f"Topic(s) for object types {','.join(unknown_types)} "
"are unknown on the kafka broker"
)
logger.debug(f"Upstream topics: {existing_topics}")
self.subscribe()
self.stop_after_objects = stop_after_objects
self.process_timeout = process_timeout
self.eof_reached: Set[Tuple[str, str]] = set()
self.batch_size = batch_size
def subscribe(self):
"""Subscribe to topics listed in self.subscription
This can be overridden if you need, for instance, to manually assign partitions.
"""
logger.debug(f"Subscribing to: {self.subscription}")
self.consumer.subscribe(topics=self.subscription)
def process(self, worker_fn):
"""Polls Kafka for a batch of messages, and calls the worker_fn
with these messages.
Args:
worker_fn Callable[Dict[str, List[dict]]]: Function called with
the messages as
argument.
"""
start_time = time.monotonic()
total_objects_processed = 0
while True:
# timeout for message poll
timeout = 1.0
elapsed = time.monotonic() - start_time
if self.process_timeout:
# +0.01 to prevent busy-waiting on / spamming consumer.poll.
# consumer.consume() returns shortly before X expired
# (a matter of milliseconds), so after it returns a first
# time, it would then be called with a timeout in the order
# of milliseconds, therefore returning immediately, then be
# called again, etc.
if elapsed + 0.01 >= self.process_timeout:
break
timeout = self.process_timeout - elapsed
batch_size = self.batch_size
if self.stop_after_objects:
if total_objects_processed >= self.stop_after_objects:
break
# clamp batch size to avoid overrunning stop_after_objects
batch_size = min(
self.stop_after_objects - total_objects_processed, batch_size,
)
messages = self.consumer.consume(timeout=timeout, num_messages=batch_size)
if not messages:
continue
batch_processed, at_eof = self.handle_messages(messages, worker_fn)
total_objects_processed += batch_processed
if at_eof:
break
return total_objects_processed
def handle_messages(self, messages, worker_fn):
objects: Dict[str, List[Any]] = defaultdict(list)
nb_processed = 0
for message in messages:
error = message.error()
if error is not None:
if error.code() == KafkaError._PARTITION_EOF:
self.eof_reached.add((message.topic(), message.partition()))
else:
_error_cb(error)
continue
if message.value() is None:
# ignore message with no payload, these can be generated in tests
continue
nb_processed += 1
object_type = message.topic().split(".")[-1]
objects[object_type].append(self.deserialize_message(message))
if objects:
worker_fn(dict(objects))
self.consumer.commit()
at_eof = self.stop_on_eof and all(
(tp.topic, tp.partition) in self.eof_reached
for tp in self.consumer.assignment()
)
return nb_processed, at_eof
def deserialize_message(self, message):
return self.value_deserializer(message.value())
def close(self):
self.consumer.close()
diff --git a/swh/journal/pytest_plugin.py b/swh/journal/pytest_plugin.py
index 778eeef..c1025e6 100644
--- a/swh/journal/pytest_plugin.py
+++ b/swh/journal/pytest_plugin.py
@@ -1,241 +1,239 @@
# Copyright (C) 2019-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
+from collections import defaultdict
import random
import string
-
from typing import Collection, Dict, Iterator, Optional
-from collections import defaultdict
import attr
-import pytest
-
from confluent_kafka import Consumer, KafkaException, Producer
from confluent_kafka.admin import AdminClient
+import pytest
-from swh.journal.serializers import object_key, kafka_to_key, kafka_to_value, pprint_key
+from swh.journal.serializers import kafka_to_key, kafka_to_value, object_key, pprint_key
from swh.journal.tests.journal_data import TEST_OBJECTS
def consume_messages(consumer, kafka_prefix, expected_messages):
"""Consume expected_messages from the consumer;
Sort them all into a consumed_objects dict"""
consumed_messages = defaultdict(list)
fetched_messages = 0
retries_left = 1000
while fetched_messages < expected_messages:
if retries_left == 0:
raise ValueError(
"Timed out fetching messages from kafka. "
f"Only {fetched_messages}/{expected_messages} fetched"
)
msg = consumer.poll(timeout=0.01)
if not msg:
retries_left -= 1
continue
error = msg.error()
if error is not None:
if error.fatal():
raise KafkaException(error)
retries_left -= 1
continue
fetched_messages += 1
topic = msg.topic()
assert topic.startswith(f"{kafka_prefix}.") or topic.startswith(
f"{kafka_prefix}_privileged."
), "Unexpected topic"
object_type = topic[len(kafka_prefix + ".") :]
consumed_messages[object_type].append(
(kafka_to_key(msg.key()), kafka_to_value(msg.value()))
)
return consumed_messages
def assert_all_objects_consumed(
consumed_messages: Dict, exclude: Optional[Collection] = None
):
"""Check whether all objects from TEST_OBJECTS have been consumed
`exclude` can be a list of object types for which we do not want to compare the
values (eg. for anonymized object).
"""
for object_type, known_objects in TEST_OBJECTS.items():
known_keys = [object_key(object_type, obj) for obj in known_objects]
if not consumed_messages[object_type]:
return
(received_keys, received_values) = zip(*consumed_messages[object_type])
if object_type in ("content", "skipped_content"):
for value in received_values:
value.pop("ctime", None)
if object_type == "content":
known_objects = [attr.evolve(o, data=None) for o in known_objects]
for key in known_keys:
assert key in received_keys, (
f"expected {object_type} key {pprint_key(key)} "
"absent from consumed messages"
)
if exclude and object_type in exclude:
continue
for value in known_objects:
expected_value = value.to_dict()
if value.object_type in ("content", "skipped_content"):
expected_value.pop("ctime", None)
assert expected_value in received_values, (
f"expected {object_type} value {value!r} is "
"absent from consumed messages"
)
@pytest.fixture(scope="function")
def kafka_prefix():
"""Pick a random prefix for kafka topics on each call"""
return "".join(random.choice(string.ascii_lowercase) for _ in range(10))
@pytest.fixture(scope="function")
def kafka_consumer_group(kafka_prefix: str):
"""Pick a random consumer group for kafka consumers on each call"""
return "test-consumer-%s" % kafka_prefix
@pytest.fixture(scope="function")
def object_types():
"""Set of object types to precreate topics for."""
return set(TEST_OBJECTS.keys())
@pytest.fixture(scope="function")
def privileged_object_types():
"""Set of object types to precreate privileged topics for."""
return {"revision", "release"}
@pytest.fixture(scope="function")
def kafka_server(
kafka_server_base: str,
kafka_prefix: str,
object_types: Iterator[str],
privileged_object_types: Iterator[str],
) -> str:
"""A kafka server with existing topics
Unprivileged topics are built as ``{kafka_prefix}.{object_type}`` with object_type
from the ``object_types`` list.
Privileged topics are built as ``{kafka_prefix}_privileged.{object_type}`` with
object_type from the ``privileged_object_types`` list.
"""
topics = [f"{kafka_prefix}.{obj}" for obj in object_types] + [
f"{kafka_prefix}_privileged.{obj}" for obj in privileged_object_types
]
# unfortunately, the Mock broker does not support the CreatTopic admin API, so we
# have to create topics using a Producer.
producer = Producer(
{
"bootstrap.servers": kafka_server_base,
"client.id": "bootstrap producer",
"acks": "all",
}
)
for topic in topics:
producer.produce(topic=topic, value=None)
for i in range(10):
if producer.flush(0.1) == 0:
break
return kafka_server_base
@pytest.fixture(scope="session")
def kafka_server_base() -> Iterator[str]:
"""Create a mock kafka cluster suitable for tests.
Yield a connection string.
Note: this is a generator to keep the mock broker alive during the whole test
session.
see https://github.com/edenhill/librdkafka/blob/master/src/rdkafka_mock.h
"""
admin = AdminClient({"test.mock.num.brokers": "1"})
metadata = admin.list_topics()
brokers = [str(broker) for broker in metadata.brokers.values()]
assert len(brokers) == 1, "More than one broker found in the kafka cluster?!"
broker_connstr, broker_id = brokers[0].split("/")
yield broker_connstr
TEST_CONFIG = {
"consumer_id": "swh.journal.consumer",
"stop_after_objects": 1, # will read 1 object and stop
"storage": {"cls": "memory", "args": {}},
}
@pytest.fixture
def test_config(
kafka_server_base: str,
kafka_prefix: str,
object_types: Iterator[str],
privileged_object_types: Iterator[str],
):
"""Test configuration needed for producer/consumer
"""
return {
**TEST_CONFIG,
"object_types": object_types,
"privileged_object_types": privileged_object_types,
"brokers": [kafka_server_base],
"prefix": kafka_prefix,
}
@pytest.fixture
def consumer(
kafka_server: str, test_config: Dict, kafka_consumer_group: str
) -> Consumer:
"""Get a connected Kafka consumer.
"""
consumer = Consumer(
{
"bootstrap.servers": kafka_server,
"auto.offset.reset": "earliest",
"enable.auto.commit": True,
"group.id": kafka_consumer_group,
}
)
prefix = test_config["prefix"]
kafka_topics = [
f"{prefix}.{object_type}" for object_type in test_config["object_types"]
] + [
f"{prefix}_privileged.{object_type}"
for object_type in test_config["privileged_object_types"]
]
consumer.subscribe(kafka_topics)
yield consumer
consumer.close()
diff --git a/swh/journal/tests/conftest.py b/swh/journal/tests/conftest.py
index ec2a3cf..39670ed 100644
--- a/swh/journal/tests/conftest.py
+++ b/swh/journal/tests/conftest.py
@@ -1,28 +1,27 @@
# Copyright (C) 2019-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import logging
from hypothesis.strategies import one_of
-from swh.model import hypothesis_strategies as strategies
-
# for bw compat
from swh.journal.tests.journal_data import * # noqa
+from swh.model import hypothesis_strategies as strategies
logger = logging.getLogger(__name__)
def objects_d():
return one_of(
strategies.origins().map(lambda x: ("origin", x.to_dict())),
strategies.origin_visits().map(lambda x: ("origin_visit", x.to_dict())),
strategies.snapshots().map(lambda x: ("snapshot", x.to_dict())),
strategies.releases().map(lambda x: ("release", x.to_dict())),
strategies.revisions().map(lambda x: ("revision", x.to_dict())),
strategies.directories().map(lambda x: ("directory", x.to_dict())),
strategies.skipped_contents().map(lambda x: ("skipped_content", x.to_dict())),
strategies.present_contents().map(lambda x: ("content", x.to_dict())),
)
diff --git a/swh/journal/tests/journal_data.py b/swh/journal/tests/journal_data.py
index 336f5ac..f891293 100644
--- a/swh/journal/tests/journal_data.py
+++ b/swh/journal/tests/journal_data.py
@@ -1,343 +1,341 @@
# Copyright (C) 2019-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import datetime
-
from typing import Dict, Sequence
import attr
-from swh.model.hashutil import MultiHash, hash_to_bytes, hash_to_hex
from swh.journal.serializers import ModelObject
-
+from swh.model.hashutil import MultiHash, hash_to_bytes, hash_to_hex
from swh.model.identifiers import SWHID
from swh.model.model import (
Content,
Directory,
DirectoryEntry,
MetadataAuthority,
MetadataAuthorityType,
MetadataFetcher,
MetadataTargetType,
ObjectType,
Origin,
OriginVisit,
OriginVisitStatus,
Person,
RawExtrinsicMetadata,
Release,
Revision,
RevisionType,
SkippedContent,
Snapshot,
SnapshotBranch,
TargetType,
Timestamp,
TimestampWithTimezone,
)
UTC = datetime.timezone.utc
CONTENTS = [
Content(
length=4,
data=f"foo{i}".encode(),
status="visible",
**MultiHash.from_data(f"foo{i}".encode()).digest(),
)
for i in range(10)
] + [
Content(
length=14,
data=f"forbidden foo{i}".encode(),
status="hidden",
**MultiHash.from_data(f"forbidden foo{i}".encode()).digest(),
)
for i in range(10)
]
SKIPPED_CONTENTS = [
SkippedContent(
length=4,
status="absent",
reason=f"because chr({i}) != '*'",
**MultiHash.from_data(f"bar{i}".encode()).digest(),
)
for i in range(2)
]
duplicate_content1 = Content(
length=4,
sha1=hash_to_bytes("44973274ccef6ab4dfaaf86599792fa9c3fe4689"),
sha1_git=b"another-foo",
blake2s256=b"another-bar",
sha256=b"another-baz",
status="visible",
)
# Craft a sha1 collision
sha1_array = bytearray(duplicate_content1.sha1_git)
sha1_array[0] += 1
duplicate_content2 = attr.evolve(duplicate_content1, sha1_git=bytes(sha1_array))
DUPLICATE_CONTENTS = [duplicate_content1, duplicate_content2]
COMMITTERS = [
Person(fullname=b"foo", name=b"foo", email=b""),
Person(fullname=b"bar", name=b"bar", email=b""),
]
DATES = [
TimestampWithTimezone(
timestamp=Timestamp(seconds=1234567891, microseconds=0,),
offset=120,
negative_utc=False,
),
TimestampWithTimezone(
timestamp=Timestamp(seconds=1234567892, microseconds=0,),
offset=120,
negative_utc=False,
),
]
REVISIONS = [
Revision(
id=hash_to_bytes("4ca486e65eb68e4986aeef8227d2db1d56ce51b3"),
message=b"hello",
date=DATES[0],
committer=COMMITTERS[0],
author=COMMITTERS[0],
committer_date=DATES[0],
type=RevisionType.GIT,
directory=b"\x01" * 20,
synthetic=False,
metadata=None,
parents=(),
),
Revision(
id=hash_to_bytes("677063f5c405d6fc1781fc56379c9a9adf43d3a0"),
message=b"hello again",
date=DATES[1],
committer=COMMITTERS[1],
author=COMMITTERS[1],
committer_date=DATES[1],
type=RevisionType.MERCURIAL,
directory=b"\x02" * 20,
synthetic=False,
metadata=None,
parents=(),
extra_headers=((b"foo", b"bar"),),
),
]
RELEASES = [
Release(
id=hash_to_bytes("8059dc4e17fcd0e51ca3bcd6b80f4577d281fd08"),
name=b"v0.0.1",
date=TimestampWithTimezone(
timestamp=Timestamp(seconds=1234567890, microseconds=0,),
offset=120,
negative_utc=False,
),
author=COMMITTERS[0],
target_type=ObjectType.REVISION,
target=b"\x04" * 20,
message=b"foo",
synthetic=False,
),
]
ORIGINS = [
Origin(url="https://somewhere.org/den/fox",),
Origin(url="https://overtherainbow.org/fox/den",),
]
ORIGIN_VISITS = [
OriginVisit(
origin=ORIGINS[0].url,
date=datetime.datetime(2013, 5, 7, 4, 20, 39, 369271, tzinfo=UTC),
visit=1,
type="git",
),
OriginVisit(
origin=ORIGINS[1].url,
date=datetime.datetime(2014, 11, 27, 17, 20, 39, tzinfo=UTC),
visit=1,
type="hg",
),
OriginVisit(
origin=ORIGINS[0].url,
date=datetime.datetime(2018, 11, 27, 17, 20, 39, tzinfo=UTC),
visit=2,
type="git",
),
OriginVisit(
origin=ORIGINS[0].url,
date=datetime.datetime(2018, 11, 27, 17, 20, 39, tzinfo=UTC),
visit=3,
type="git",
),
OriginVisit(
origin=ORIGINS[1].url,
date=datetime.datetime(2015, 11, 27, 17, 20, 39, tzinfo=UTC),
visit=2,
type="hg",
),
]
# The origin-visit-status dates needs to be shifted slightly in the future from their
# visit dates counterpart. Otherwise, we are hitting storage-wise the "on conflict"
# ignore policy (because origin-visit-add creates an origin-visit-status with the same
# parameters from the origin-visit {origin, visit, date}...
ORIGIN_VISIT_STATUSES = [
OriginVisitStatus(
origin=ORIGINS[0].url,
date=datetime.datetime(2013, 5, 7, 4, 20, 39, 432222, tzinfo=UTC),
visit=1,
status="ongoing",
snapshot=None,
metadata=None,
),
OriginVisitStatus(
origin=ORIGINS[1].url,
date=datetime.datetime(2014, 11, 27, 17, 21, 12, tzinfo=UTC),
visit=1,
status="ongoing",
snapshot=None,
metadata=None,
),
OriginVisitStatus(
origin=ORIGINS[0].url,
date=datetime.datetime(2018, 11, 27, 17, 20, 59, tzinfo=UTC),
visit=2,
status="ongoing",
snapshot=None,
metadata=None,
),
OriginVisitStatus(
origin=ORIGINS[0].url,
date=datetime.datetime(2018, 11, 27, 17, 20, 49, tzinfo=UTC),
visit=3,
status="full",
snapshot=hash_to_bytes("17d0066a4a80aba4a0e913532ee8ff2014f006a9"),
metadata=None,
),
OriginVisitStatus(
origin=ORIGINS[1].url,
date=datetime.datetime(2015, 11, 27, 17, 22, 18, tzinfo=UTC),
visit=2,
status="partial",
snapshot=hash_to_bytes("8ce268b87faf03850693673c3eb5c9bb66e1ca38"),
metadata=None,
),
]
DIRECTORIES = [
Directory(id=hash_to_bytes("4b825dc642cb6eb9a060e54bf8d69288fbee4904"), entries=()),
Directory(
id=hash_to_bytes("21416d920e0ebf0df4a7888bed432873ed5cb3a7"),
entries=(
DirectoryEntry(
name=b"file1.ext",
perms=0o644,
type="file",
target=CONTENTS[0].sha1_git,
),
DirectoryEntry(
name=b"dir1",
perms=0o755,
type="dir",
target=hash_to_bytes("4b825dc642cb6eb9a060e54bf8d69288fbee4904"),
),
DirectoryEntry(
name=b"subprepo1", perms=0o160000, type="rev", target=REVISIONS[1].id,
),
),
),
]
SNAPSHOTS = [
Snapshot(
id=hash_to_bytes("17d0066a4a80aba4a0e913532ee8ff2014f006a9"),
branches={
b"master": SnapshotBranch(
target_type=TargetType.REVISION, target=REVISIONS[0].id
)
},
),
Snapshot(
id=hash_to_bytes("8ce268b87faf03850693673c3eb5c9bb66e1ca38"),
branches={
b"target/revision": SnapshotBranch(
target_type=TargetType.REVISION, target=REVISIONS[0].id,
),
b"target/alias": SnapshotBranch(
target_type=TargetType.ALIAS, target=b"target/revision"
),
b"target/directory": SnapshotBranch(
target_type=TargetType.DIRECTORY, target=DIRECTORIES[0].id,
),
b"target/release": SnapshotBranch(
target_type=TargetType.RELEASE, target=RELEASES[0].id
),
b"target/snapshot": SnapshotBranch(
target_type=TargetType.SNAPSHOT,
target=hash_to_bytes("17d0066a4a80aba4a0e913532ee8ff2014f006a9"),
),
},
),
]
METADATA_AUTHORITIES = [
MetadataAuthority(
type=MetadataAuthorityType.FORGE, url="http://example.org/", metadata={},
),
]
METADATA_FETCHERS = [
MetadataFetcher(name="test-fetcher", version="1.0.0", metadata={},)
]
RAW_EXTRINSIC_METADATA = [
RawExtrinsicMetadata(
type=MetadataTargetType.ORIGIN,
id="http://example.org/foo.git",
discovery_date=datetime.datetime(2020, 7, 30, 17, 8, 20, tzinfo=UTC),
authority=attr.evolve(METADATA_AUTHORITIES[0], metadata=None),
fetcher=attr.evolve(METADATA_FETCHERS[0], metadata=None),
format="json",
metadata=b'{"foo": "bar"}',
),
RawExtrinsicMetadata(
type=MetadataTargetType.CONTENT,
id=SWHID(object_type="content", object_id=hash_to_hex(CONTENTS[0].sha1_git)),
discovery_date=datetime.datetime(2020, 7, 30, 17, 8, 20, tzinfo=UTC),
authority=attr.evolve(METADATA_AUTHORITIES[0], metadata=None),
fetcher=attr.evolve(METADATA_FETCHERS[0], metadata=None),
format="json",
metadata=b'{"foo": "bar"}',
),
]
TEST_OBJECTS: Dict[str, Sequence[ModelObject]] = {
"content": CONTENTS,
"directory": DIRECTORIES,
"metadata_authority": METADATA_AUTHORITIES,
"metadata_fetcher": METADATA_FETCHERS,
"origin": ORIGINS,
"origin_visit": ORIGIN_VISITS,
"origin_visit_status": ORIGIN_VISIT_STATUSES,
"raw_extrinsic_metadata": RAW_EXTRINSIC_METADATA,
"release": RELEASES,
"revision": REVISIONS,
"snapshot": SNAPSHOTS,
"skipped_content": SKIPPED_CONTENTS,
}
diff --git a/swh/journal/tests/test_client.py b/swh/journal/tests/test_client.py
index 66278b8..3984c46 100644
--- a/swh/journal/tests/test_client.py
+++ b/swh/journal/tests/test_client.py
@@ -1,330 +1,329 @@
# Copyright (C) 2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from typing import Dict, List
from unittest.mock import MagicMock
from confluent_kafka import Producer
import pytest
-from swh.model.model import Content
-
from swh.journal.client import JournalClient
from swh.journal.serializers import key_to_kafka, value_to_kafka
+from swh.model.model import Content
REV = {
"message": b"something cool",
"author": {"fullname": b"Peter", "name": None, "email": b"peter@ouiche.lo"},
"committer": {"fullname": b"Stephen", "name": b"From Outer Space", "email": None},
"date": {
"timestamp": {"seconds": 123456789, "microseconds": 123},
"offset": 120,
"negative_utc": False,
},
"committer_date": {
"timestamp": {"seconds": 123123456, "microseconds": 0},
"offset": 0,
"negative_utc": False,
},
"type": "git",
"directory": (
b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
b"\x01\x02\x03\x04\x05"
),
"synthetic": False,
"metadata": None,
"parents": (),
"id": b"\x8b\xeb\xd1\x9d\x07\xe2\x1e0\xe2 \x91X\x8d\xbd\x1c\xa8\x86\xdeB\x0c",
}
def test_client(kafka_prefix: str, kafka_consumer_group: str, kafka_server: str):
producer = Producer(
{
"bootstrap.servers": kafka_server,
"client.id": "test producer",
"acks": "all",
}
)
# Fill Kafka
producer.produce(
topic=kafka_prefix + ".revision", key=REV["id"], value=value_to_kafka(REV),
)
producer.flush()
client = JournalClient(
brokers=[kafka_server],
group_id=kafka_consumer_group,
prefix=kafka_prefix,
stop_after_objects=1,
)
worker_fn = MagicMock()
client.process(worker_fn)
worker_fn.assert_called_once_with({"revision": [REV]})
def test_client_eof(kafka_prefix: str, kafka_consumer_group: str, kafka_server: str):
producer = Producer(
{
"bootstrap.servers": kafka_server,
"client.id": "test producer",
"acks": "all",
}
)
# Fill Kafka
producer.produce(
topic=kafka_prefix + ".revision", key=REV["id"], value=value_to_kafka(REV),
)
producer.flush()
client = JournalClient(
brokers=[kafka_server],
group_id=kafka_consumer_group,
prefix=kafka_prefix,
stop_after_objects=None,
stop_on_eof=True,
)
worker_fn = MagicMock()
client.process(worker_fn)
worker_fn.assert_called_once_with({"revision": [REV]})
@pytest.mark.parametrize("batch_size", [1, 5, 100])
def test_client_batch_size(
kafka_prefix: str, kafka_consumer_group: str, kafka_server: str, batch_size: int,
):
num_objects = 2 * batch_size + 1
assert num_objects < 256, "Too many objects, generation will fail"
producer = Producer(
{
"bootstrap.servers": kafka_server,
"client.id": "test producer",
"acks": "all",
}
)
contents = [Content.from_data(bytes([i])) for i in range(num_objects)]
# Fill Kafka
for content in contents:
producer.produce(
topic=kafka_prefix + ".content",
key=key_to_kafka(content.sha1),
value=value_to_kafka(content.to_dict()),
)
producer.flush()
client = JournalClient(
brokers=[kafka_server],
group_id=kafka_consumer_group,
prefix=kafka_prefix,
stop_after_objects=num_objects,
batch_size=batch_size,
)
collected_output: List[Dict] = []
def worker_fn(objects):
received = objects["content"]
assert len(received) <= batch_size
collected_output.extend(received)
client.process(worker_fn)
expected_output = [content.to_dict() for content in contents]
assert len(collected_output) == len(expected_output)
for output in collected_output:
assert output in expected_output
@pytest.fixture()
def kafka_producer(kafka_prefix: str, kafka_server_base: str):
producer = Producer(
{
"bootstrap.servers": kafka_server_base,
"client.id": "test producer",
"acks": "all",
}
)
# Fill Kafka
producer.produce(
topic=kafka_prefix + ".something",
key=key_to_kafka(b"key1"),
value=value_to_kafka("value1"),
)
producer.produce(
topic=kafka_prefix + ".else",
key=key_to_kafka(b"key1"),
value=value_to_kafka("value2"),
)
producer.flush()
return producer
def test_client_subscribe_all(
kafka_producer: Producer, kafka_prefix: str, kafka_server_base: str
):
client = JournalClient(
brokers=[kafka_server_base],
group_id="whatever",
prefix=kafka_prefix,
stop_after_objects=2,
)
assert set(client.subscription) == {
f"{kafka_prefix}.something",
f"{kafka_prefix}.else",
}
worker_fn = MagicMock()
client.process(worker_fn)
worker_fn.assert_called_once_with(
{"something": ["value1"], "else": ["value2"],}
)
def test_client_subscribe_one_topic(
kafka_producer: Producer, kafka_prefix: str, kafka_server_base: str
):
client = JournalClient(
brokers=[kafka_server_base],
group_id="whatever",
prefix=kafka_prefix,
stop_after_objects=1,
object_types=["else"],
)
assert client.subscription == [f"{kafka_prefix}.else"]
worker_fn = MagicMock()
client.process(worker_fn)
worker_fn.assert_called_once_with({"else": ["value2"]})
def test_client_subscribe_absent_topic(
kafka_producer: Producer, kafka_prefix: str, kafka_server_base: str
):
with pytest.raises(ValueError):
JournalClient(
brokers=[kafka_server_base],
group_id="whatever",
prefix=kafka_prefix,
stop_after_objects=1,
object_types=["really"],
)
def test_client_subscribe_absent_prefix(
kafka_producer: Producer, kafka_prefix: str, kafka_server_base: str
):
with pytest.raises(ValueError):
JournalClient(
brokers=[kafka_server_base],
group_id="whatever",
prefix="wrong.prefix",
stop_after_objects=1,
)
with pytest.raises(ValueError):
JournalClient(
brokers=[kafka_server_base],
group_id="whatever",
prefix="wrong.prefix",
stop_after_objects=1,
object_types=["else"],
)
def test_client_subscriptions_with_anonymized_topics(
kafka_prefix: str, kafka_consumer_group: str, kafka_server_base: str
):
producer = Producer(
{
"bootstrap.servers": kafka_server_base,
"client.id": "test producer",
"acks": "all",
}
)
# Fill Kafka with revision object on both the regular prefix (normally for
# anonymized objects in this case) and privileged one
producer.produce(
topic=kafka_prefix + ".revision", key=REV["id"], value=value_to_kafka(REV),
)
producer.produce(
topic=kafka_prefix + "_privileged.revision",
key=REV["id"],
value=value_to_kafka(REV),
)
producer.flush()
# without privileged "channels" activated on the client side
client = JournalClient(
brokers=[kafka_server_base],
group_id=kafka_consumer_group,
prefix=kafka_prefix,
stop_after_objects=1,
privileged=False,
)
# we only subscribed to "standard" topics
assert client.subscription == [kafka_prefix + ".revision"]
# with privileged "channels" activated on the client side
client = JournalClient(
brokers=[kafka_server_base],
group_id=kafka_consumer_group,
prefix=kafka_prefix,
stop_after_objects=1,
privileged=True,
)
# we only subscribed to "privileged" topics
assert client.subscription == [kafka_prefix + "_privileged.revision"]
def test_client_subscriptions_without_anonymized_topics(
kafka_prefix: str, kafka_consumer_group: str, kafka_server_base: str
):
producer = Producer(
{
"bootstrap.servers": kafka_server_base,
"client.id": "test producer",
"acks": "all",
}
)
# Fill Kafka with revision objects only on the standard prefix
producer.produce(
topic=kafka_prefix + ".revision", key=REV["id"], value=value_to_kafka(REV),
)
producer.flush()
# without privileged channel activated on the client side
client = JournalClient(
brokers=[kafka_server_base],
group_id=kafka_consumer_group,
prefix=kafka_prefix,
stop_after_objects=1,
privileged=False,
)
# we only subscribed to the standard prefix
assert client.subscription == [kafka_prefix + ".revision"]
# with privileged channel activated on the client side
client = JournalClient(
brokers=[kafka_server_base],
group_id=kafka_consumer_group,
prefix=kafka_prefix,
stop_after_objects=1,
privileged=True,
)
# we also only subscribed to the standard prefix, since there is no priviled prefix
# on the kafka broker
assert client.subscription == [kafka_prefix + ".revision"]
diff --git a/swh/journal/tests/test_kafka_writer.py b/swh/journal/tests/test_kafka_writer.py
index 1fe02cc..d09d43a 100644
--- a/swh/journal/tests/test_kafka_writer.py
+++ b/swh/journal/tests/test_kafka_writer.py
@@ -1,166 +1,165 @@
# Copyright (C) 2018-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from typing import Iterable
-import pytest
from confluent_kafka import Consumer, Producer
+import pytest
-from swh.model.model import Directory, Revision, Release
-
+from swh.journal.pytest_plugin import assert_all_objects_consumed, consume_messages
from swh.journal.tests.journal_data import TEST_OBJECTS
-from swh.journal.pytest_plugin import consume_messages, assert_all_objects_consumed
-from swh.journal.writer.kafka import KafkaJournalWriter, KafkaDeliveryError
+from swh.journal.writer.kafka import KafkaDeliveryError, KafkaJournalWriter
+from swh.model.model import Directory, Release, Revision
def test_kafka_writer(
kafka_prefix: str,
kafka_server: str,
consumer: Consumer,
privileged_object_types: Iterable[str],
):
writer = KafkaJournalWriter(
brokers=[kafka_server],
client_id="kafka_writer",
prefix=kafka_prefix,
anonymize=False,
)
expected_messages = 0
for object_type, objects in TEST_OBJECTS.items():
writer.write_additions(object_type, objects)
expected_messages += len(objects)
consumed_messages = consume_messages(consumer, kafka_prefix, expected_messages)
assert_all_objects_consumed(consumed_messages)
for key, obj_dict in consumed_messages["revision"]:
obj = Revision.from_dict(obj_dict)
for person in (obj.author, obj.committer):
assert not (
len(person.fullname) == 32
and person.name is None
and person.email is None
)
for key, obj_dict in consumed_messages["release"]:
obj = Release.from_dict(obj_dict)
for person in (obj.author,):
assert not (
len(person.fullname) == 32
and person.name is None
and person.email is None
)
def test_kafka_writer_anonymized(
kafka_prefix: str,
kafka_server: str,
consumer: Consumer,
privileged_object_types: Iterable[str],
):
writer = KafkaJournalWriter(
brokers=[kafka_server],
client_id="kafka_writer",
prefix=kafka_prefix,
anonymize=True,
)
expected_messages = 0
for object_type, objects in TEST_OBJECTS.items():
writer.write_additions(object_type, objects)
expected_messages += len(objects)
if object_type in privileged_object_types:
expected_messages += len(objects)
consumed_messages = consume_messages(consumer, kafka_prefix, expected_messages)
assert_all_objects_consumed(consumed_messages, exclude=["revision", "release"])
for key, obj_dict in consumed_messages["revision"]:
obj = Revision.from_dict(obj_dict)
for person in (obj.author, obj.committer):
assert (
len(person.fullname) == 32
and person.name is None
and person.email is None
)
for key, obj_dict in consumed_messages["release"]:
obj = Release.from_dict(obj_dict)
for person in (obj.author,):
assert (
len(person.fullname) == 32
and person.name is None
and person.email is None
)
def test_write_delivery_failure(
kafka_prefix: str, kafka_server: str, consumer: Consumer
):
class MockKafkaError:
"""A mocked kafka error"""
def str(self):
return "Mocked Kafka Error"
def name(self):
return "SWH_MOCK_ERROR"
class KafkaJournalWriterFailDelivery(KafkaJournalWriter):
"""A journal writer which always fails delivering messages"""
def _on_delivery(self, error, message):
"""Replace the inbound error with a fake delivery error"""
super()._on_delivery(MockKafkaError(), message)
kafka_prefix += ".swh.journal.objects"
writer = KafkaJournalWriterFailDelivery(
brokers=[kafka_server], client_id="kafka_writer", prefix=kafka_prefix,
)
empty_dir = Directory(entries=())
with pytest.raises(KafkaDeliveryError) as exc:
writer.write_addition("directory", empty_dir)
assert "Failed deliveries" in exc.value.message
assert len(exc.value.delivery_failures) == 1
delivery_failure = exc.value.delivery_failures[0]
assert delivery_failure.key == empty_dir.id
assert delivery_failure.code == "SWH_MOCK_ERROR"
def test_write_delivery_timeout(
kafka_prefix: str, kafka_server: str, consumer: Consumer
):
produced = []
class MockProducer(Producer):
"""A kafka producer which pretends to produce messages, but never sends any
delivery acknowledgements"""
def produce(self, **kwargs):
produced.append(kwargs)
writer = KafkaJournalWriter(
brokers=[kafka_server],
client_id="kafka_writer",
prefix=kafka_prefix,
flush_timeout=1,
producer_class=MockProducer,
)
empty_dir = Directory(entries=())
with pytest.raises(KafkaDeliveryError) as exc:
writer.write_addition("directory", empty_dir)
assert len(produced) == 1
assert "timeout" in exc.value.message
assert len(exc.value.delivery_failures) == 1
delivery_failure = exc.value.delivery_failures[0]
assert delivery_failure.key == empty_dir.id
assert delivery_failure.code == "SWH_FLUSH_TIMEOUT"
diff --git a/swh/journal/tests/test_pytest_plugin.py b/swh/journal/tests/test_pytest_plugin.py
index 14e49d7..02c0ef7 100644
--- a/swh/journal/tests/test_pytest_plugin.py
+++ b/swh/journal/tests/test_pytest_plugin.py
@@ -1,71 +1,72 @@
# Copyright (C) 2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from typing import Iterator
+
from confluent_kafka.admin import AdminClient
def test_kafka_server(kafka_server_base: str):
ip, port_str = kafka_server_base.split(":")
assert ip == "127.0.0.1"
assert int(port_str)
admin = AdminClient({"bootstrap.servers": kafka_server_base})
topics = admin.list_topics()
assert len(topics.brokers) == 1
def test_kafka_server_with_topics(
kafka_server: str,
kafka_prefix: str,
object_types: Iterator[str],
privileged_object_types: Iterator[str],
):
admin = AdminClient({"bootstrap.servers": kafka_server})
# check unprivileged topics are present
topics = {
topic
for topic in admin.list_topics().topics
if topic.startswith(f"{kafka_prefix}.")
}
assert topics == {f"{kafka_prefix}.{obj}" for obj in object_types}
# check privileged topics are present
topics = {
topic
for topic in admin.list_topics().topics
if topic.startswith(f"{kafka_prefix}_privileged.")
}
assert topics == {
f"{kafka_prefix}_privileged.{obj}" for obj in privileged_object_types
}
def test_test_config(test_config: dict, kafka_prefix: str, kafka_server_base: str):
assert test_config == {
"consumer_id": "swh.journal.consumer",
"stop_after_objects": 1,
"storage": {"cls": "memory", "args": {}},
"object_types": {
"content",
"directory",
"metadata_authority",
"metadata_fetcher",
"origin",
"origin_visit",
"origin_visit_status",
"raw_extrinsic_metadata",
"release",
"revision",
"snapshot",
"skipped_content",
},
"privileged_object_types": {"release", "revision",},
"brokers": [kafka_server_base],
"prefix": kafka_prefix,
}
diff --git a/swh/journal/tests/test_serializers.py b/swh/journal/tests/test_serializers.py
index 1a1fdd9..4cbe236 100644
--- a/swh/journal/tests/test_serializers.py
+++ b/swh/journal/tests/test_serializers.py
@@ -1,77 +1,76 @@
# Copyright (C) 2017-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-import itertools
-
from collections import OrderedDict
+import itertools
from typing import Iterable
from swh.journal import serializers
from .conftest import TEST_OBJECTS
def test_key_to_kafka_repeatable():
"""Check the kafka key encoding is repeatable"""
base_dict = {
"a": "foo",
"b": "bar",
"c": "baz",
}
key = serializers.key_to_kafka(base_dict)
for dict_keys in itertools.permutations(base_dict):
d = OrderedDict()
for k in dict_keys:
d[k] = base_dict[k]
assert key == serializers.key_to_kafka(d)
def test_get_key():
"""Test whether get_key works on all our objects"""
for object_type, objects in TEST_OBJECTS.items():
for obj in objects:
assert serializers.object_key(object_type, obj) is not None
def test_pprint_key():
"""Test whether get_key works on all our objects"""
for object_type, objects in TEST_OBJECTS.items():
for obj in objects:
key = serializers.object_key(object_type, obj)
pprinted_key = serializers.pprint_key(key)
assert isinstance(pprinted_key, str)
if isinstance(key, dict):
assert pprinted_key[0], pprinted_key[-1] == "{}"
for dict_key in key.keys():
assert f"{dict_key}:" in pprinted_key
if isinstance(key, bytes):
assert pprinted_key == key.hex()
def test_kafka_to_key():
"""Standard back and forth serialization with keys
"""
# All KeyType(s)
keys: Iterable[serializers.KeyType] = [
{"a": "foo", "b": "bar", "c": "baz",},
{"a": b"foobarbaz",},
b"foo",
]
for object_type, objects in TEST_OBJECTS.items():
for obj in objects:
key = serializers.object_key(object_type, obj)
keys.append(key)
for key in keys:
ktk = serializers.key_to_kafka(key)
v = serializers.kafka_to_key(ktk)
assert v == key
diff --git a/swh/journal/writer/inmemory.py b/swh/journal/writer/inmemory.py
index 61a9269..9c81b6a 100644
--- a/swh/journal/writer/inmemory.py
+++ b/swh/journal/writer/inmemory.py
@@ -1,40 +1,39 @@
# Copyright (C) 2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import logging
-
from multiprocessing import Manager
from typing import List
from swh.model.model import BaseModel
from .kafka import ModelObject
logger = logging.getLogger(__name__)
class InMemoryJournalWriter:
def __init__(self):
# Share the list of objects across processes, for RemoteAPI tests.
self.manager = Manager()
self.objects = self.manager.list()
self.privileged_objects = self.manager.list()
def write_addition(
self, object_type: str, object_: ModelObject, privileged: bool = False
) -> None:
assert isinstance(object_, BaseModel)
if privileged:
self.privileged_objects.append((object_type, object_))
else:
self.objects.append((object_type, object_))
write_update = write_addition
def write_additions(
self, object_type: str, objects: List[ModelObject], privileged: bool = False
) -> None:
for object_ in objects:
self.write_addition(object_type, object_, privileged)
diff --git a/swh/journal/writer/kafka.py b/swh/journal/writer/kafka.py
index 1dccdcb..e5dc547 100644
--- a/swh/journal/writer/kafka.py
+++ b/swh/journal/writer/kafka.py
@@ -1,239 +1,239 @@
# Copyright (C) 2019-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import logging
import time
from typing import Dict, Iterable, List, NamedTuple, Optional, Type
-from confluent_kafka import Producer, KafkaException
+from confluent_kafka import KafkaException, Producer
from swh.journal.serializers import (
KeyType,
ModelObject,
+ key_to_kafka,
object_key,
pprint_key,
- key_to_kafka,
value_to_kafka,
)
logger = logging.getLogger(__name__)
class DeliveryTag(NamedTuple):
"""Unique tag allowing us to check for a message delivery"""
topic: str
kafka_key: bytes
class DeliveryFailureInfo(NamedTuple):
"""Verbose information for failed deliveries"""
object_type: str
key: KeyType
message: str
code: str
def get_object_type(topic: str) -> str:
"""Get the object type from a topic string"""
return topic.rsplit(".", 1)[-1]
class KafkaDeliveryError(Exception):
"""Delivery failed on some kafka messages."""
def __init__(self, message: str, delivery_failures: Iterable[DeliveryFailureInfo]):
self.message = message
self.delivery_failures = list(delivery_failures)
def pretty_failures(self) -> str:
return ", ".join(
f"{f.object_type} {pprint_key(f.key)} ({f.message})"
for f in self.delivery_failures
)
def __str__(self):
return f"KafkaDeliveryError({self.message}, [{self.pretty_failures()}])"
class KafkaJournalWriter:
"""This class is used to write serialized versions of swh.model.model objects to a
series of Kafka topics.
Topics used to send objects representations are built from a ``prefix`` plus the
type of the object:
``{prefix}.{object_type}``
Objects can be sent as is, or can be anonymized. The anonymization feature, when
activated, will write anonymized versions of model objects in the main topic, and
stock (non-anonymized) objects will be sent to a dedicated (privileged) set of
topics:
``{prefix}_privileged.{object_type}``
The anonymization of a swh.model object is the result of calling its
``BaseModel.anonymize()`` method. An object is considered anonymizable if this
method returns a (non-None) value.
Args:
brokers: list of broker addresses and ports.
prefix: the prefix used to build the topic names for objects.
client_id: the id of the writer sent to kafka.
producer_config: extra configuration keys passed to the `Producer`.
flush_timeout: timeout, in seconds, after which the `flush` operation
will fail if some message deliveries are still pending.
producer_class: override for the kafka producer class.
anonymize: if True, activate the anonymization feature.
"""
def __init__(
self,
brokers: Iterable[str],
prefix: str,
client_id: str,
producer_config: Optional[Dict] = None,
flush_timeout: float = 120,
producer_class: Type[Producer] = Producer,
anonymize: bool = False,
):
self._prefix = prefix
self._prefix_privileged = f"{self._prefix}_privileged"
self.anonymize = anonymize
if not producer_config:
producer_config = {}
if "message.max.bytes" not in producer_config:
producer_config = {
"message.max.bytes": 100 * 1024 * 1024,
**producer_config,
}
self.producer = producer_class(
{
"bootstrap.servers": ",".join(brokers),
"client.id": client_id,
"on_delivery": self._on_delivery,
"error_cb": self._error_cb,
"logger": logger,
"acks": "all",
**producer_config,
}
)
# Delivery management
self.flush_timeout = flush_timeout
# delivery tag -> original object "key" mapping
self.deliveries_pending: Dict[DeliveryTag, KeyType] = {}
# List of (object_type, key, error_msg, error_name) for failed deliveries
self.delivery_failures: List[DeliveryFailureInfo] = []
def _error_cb(self, error):
if error.fatal():
raise KafkaException(error)
logger.info("Received non-fatal kafka error: %s", error)
def _on_delivery(self, error, message):
(topic, key) = delivery_tag = DeliveryTag(message.topic(), message.key())
sent_key = self.deliveries_pending.pop(delivery_tag, None)
if error is not None:
self.delivery_failures.append(
DeliveryFailureInfo(
get_object_type(topic), sent_key, error.str(), error.name()
)
)
def send(self, topic: str, key: KeyType, value):
kafka_key = key_to_kafka(key)
self.producer.produce(
topic=topic, key=kafka_key, value=value_to_kafka(value),
)
self.deliveries_pending[DeliveryTag(topic, kafka_key)] = key
def delivery_error(self, message) -> KafkaDeliveryError:
"""Get all failed deliveries, and clear them"""
ret = self.delivery_failures
self.delivery_failures = []
while self.deliveries_pending:
delivery_tag, orig_key = self.deliveries_pending.popitem()
(topic, kafka_key) = delivery_tag
ret.append(
DeliveryFailureInfo(
get_object_type(topic),
orig_key,
"No delivery before flush() timeout",
"SWH_FLUSH_TIMEOUT",
)
)
return KafkaDeliveryError(message, ret)
def flush(self):
start = time.monotonic()
self.producer.flush(self.flush_timeout)
while self.deliveries_pending:
if time.monotonic() - start > self.flush_timeout:
break
self.producer.poll(0.1)
if self.deliveries_pending:
# Delivery timeout
raise self.delivery_error(
"flush() exceeded timeout (%ss)" % self.flush_timeout,
)
elif self.delivery_failures:
raise self.delivery_error("Failed deliveries after flush()")
def _sanitize_object(
self, object_type: str, object_: ModelObject
) -> Dict[str, str]:
dict_ = object_.to_dict()
if object_type == "content":
dict_.pop("data", None)
return dict_
def _write_addition(self, object_type: str, object_: ModelObject) -> None:
"""Write a single object to the journal"""
key = object_key(object_type, object_)
if self.anonymize:
anon_object_ = object_.anonymize()
if anon_object_: # can be either None, or an anonymized object
# if the object is anonymizable, send the non-anonymized version in the
# privileged channel
topic = f"{self._prefix_privileged}.{object_type}"
dict_ = self._sanitize_object(object_type, object_)
logger.debug("topic: %s, key: %s, value: %s", topic, key, dict_)
self.send(topic, key=key, value=dict_)
object_ = anon_object_
topic = f"{self._prefix}.{object_type}"
dict_ = self._sanitize_object(object_type, object_)
logger.debug("topic: %s, key: %s, value: %s", topic, key, dict_)
self.send(topic, key=key, value=dict_)
def write_addition(self, object_type: str, object_: ModelObject) -> None:
"""Write a single object to the journal"""
self._write_addition(object_type, object_)
self.flush()
write_update = write_addition
def write_additions(self, object_type: str, objects: Iterable[ModelObject]) -> None:
"""Write a set of objects to the journal"""
for object_ in objects:
self._write_addition(object_type, object_)
self.flush()

File Metadata

Mime Type
text/x-diff
Expires
Jul 4 2025, 9:28 AM (5 w, 5 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3252253

Event Timeline