Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9339171
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
63 KB
Subscribers
None
View Options
diff --git a/setup.py b/setup.py
index eb30b87..7408fd4 100755
--- a/setup.py
+++ b/setup.py
@@ -1,71 +1,71 @@
#!/usr/bin/env python3
# Copyright (C) 2015-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-from setuptools import setup, find_packages
-
-from os import path
from io import open
+from os import path
+
+from setuptools import find_packages, setup
here = path.abspath(path.dirname(__file__))
# Get the long description from the README file
with open(path.join(here, "README.md"), encoding="utf-8") as f:
long_description = f.read()
def parse_requirements(name=None):
if name:
reqf = "requirements-%s.txt" % name
else:
reqf = "requirements.txt"
requirements = []
if not path.exists(reqf):
return requirements
with open(reqf) as f:
for line in f.readlines():
line = line.strip()
if not line or line.startswith("#"):
continue
requirements.append(line)
return requirements
setup(
name="swh.journal",
description="Software Heritage Journal utilities",
long_description=long_description,
long_description_content_type="text/markdown",
python_requires=">=3.7",
author="Software Heritage developers",
author_email="swh-devel@inria.fr",
url="https://forge.softwareheritage.org/diffusion/DJNL/",
packages=find_packages(),
scripts=[],
entry_points="""
[pytest11]
pytest_swh_journal = swh.journal.pytest_plugin
""",
install_requires=parse_requirements() + parse_requirements("swh"),
setup_requires=["setuptools-scm"],
use_scm_version=True,
extras_require={"testing": parse_requirements("test")},
include_package_data=True,
classifiers=[
"Programming Language :: Python :: 3",
"Intended Audience :: Developers",
"License :: OSI Approved :: GNU General Public License v3 (GPLv3)",
"Operating System :: OS Independent",
"Development Status :: 5 - Production/Stable",
],
project_urls={
"Bug Reports": "https://forge.softwareheritage.org/maniphest",
"Funding": "https://www.softwareheritage.org/donate",
"Source": "https://forge.softwareheritage.org/source/swh-journal",
"Documentation": "https://docs.softwareheritage.org/devel/swh-journal/",
},
)
diff --git a/swh/journal/client.py b/swh/journal/client.py
index 68b9a22..704a2fe 100644
--- a/swh/journal/client.py
+++ b/swh/journal/client.py
@@ -1,305 +1,306 @@
# Copyright (C) 2017 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from collections import defaultdict
import logging
import os
import time
from typing import Any, Dict, List, Optional, Set, Tuple, Union
-from confluent_kafka import Consumer, KafkaException, KafkaError
+from confluent_kafka import Consumer, KafkaError, KafkaException
-from .serializers import kafka_to_value
from swh.journal import DEFAULT_PREFIX
+from .serializers import kafka_to_value
+
logger = logging.getLogger(__name__)
rdkafka_logger = logging.getLogger(__name__ + ".rdkafka")
# Only accepted offset reset policy accepted
ACCEPTED_OFFSET_RESET = ["earliest", "latest"]
# Errors that Kafka raises too often and are not useful; therefore they
# we lower their log level to DEBUG instead of INFO.
_SPAMMY_ERRORS = [
KafkaError._NO_OFFSET,
]
def get_journal_client(cls: str, **kwargs: Any):
"""Factory function to instantiate a journal client object.
Currently, only the "kafka" journal client is supported.
"""
if cls == "kafka":
return JournalClient(**kwargs)
raise ValueError("Unknown journal client class `%s`" % cls)
def _error_cb(error):
if error.fatal():
raise KafkaException(error)
if error.code() in _SPAMMY_ERRORS:
logger.debug("Received non-fatal kafka error: %s", error)
else:
logger.info("Received non-fatal kafka error: %s", error)
def _on_commit(error, partitions):
if error is not None:
_error_cb(error)
class JournalClient:
"""A base client for the Software Heritage journal.
The current implementation of the journal uses Apache Kafka
brokers to publish messages under a given topic prefix, with each
object type using a specific topic under that prefix. If the `prefix`
argument is None (default value), it will take the default value
`'swh.journal.objects'`.
Clients subscribe to events specific to each object type as listed in the
`object_types` argument (if unset, defaults to all existing kafka topic under
the prefix).
Clients can be sharded by setting the `group_id` to a common
value across instances. The journal will share the message
throughput across the nodes sharing the same group_id.
Messages are processed by the `worker_fn` callback passed to the `process`
method, in batches of maximum `batch_size` messages (defaults to 200).
If set, the processing stops after processing `stop_after_objects` messages
in total.
`stop_on_eof` stops the processing when the client has reached the end of
each partition in turn.
`auto_offset_reset` sets the behavior of the client when the consumer group
initializes: `'earliest'` (the default) processes all objects since the
inception of the topics; `''`
Any other named argument is passed directly to KafkaConsumer().
"""
def __init__(
self,
brokers: Union[str, List[str]],
group_id: str,
prefix: Optional[str] = None,
object_types: Optional[List[str]] = None,
privileged: bool = False,
stop_after_objects: Optional[int] = None,
batch_size: int = 200,
process_timeout: Optional[float] = None,
auto_offset_reset: str = "earliest",
stop_on_eof: bool = False,
**kwargs,
):
if prefix is None:
prefix = DEFAULT_PREFIX
if auto_offset_reset not in ACCEPTED_OFFSET_RESET:
raise ValueError(
"Option 'auto_offset_reset' only accept %s, not %s"
% (ACCEPTED_OFFSET_RESET, auto_offset_reset)
)
if batch_size <= 0:
raise ValueError("Option 'batch_size' needs to be positive")
self.value_deserializer = kafka_to_value
if isinstance(brokers, str):
brokers = [brokers]
debug_logging = rdkafka_logger.isEnabledFor(logging.DEBUG)
if debug_logging and "debug" not in kwargs:
kwargs["debug"] = "consumer"
# Static group instance id management
group_instance_id = os.environ.get("KAFKA_GROUP_INSTANCE_ID")
if group_instance_id:
kwargs["group.instance.id"] = group_instance_id
if "group.instance.id" in kwargs:
# When doing static consumer group membership, set a higher default
# session timeout. The session timeout is the duration after which
# the broker considers that a consumer has left the consumer group
# for good, and triggers a rebalance. Considering our current
# processing pattern, 10 minutes gives the consumer ample time to
# restart before that happens.
if "session.timeout.ms" not in kwargs:
kwargs["session.timeout.ms"] = 10 * 60 * 1000 # 10 minutes
if "session.timeout.ms" in kwargs:
# When the session timeout is set, rdkafka requires the max poll
# interval to be set to a higher value; the max poll interval is
# rdkafka's way of figuring out whether the client's message
# processing thread has stalled: when the max poll interval lapses
# between two calls to consumer.poll(), rdkafka leaves the consumer
# group and terminates the connection to the brokers.
#
# We default to 1.5 times the session timeout
if "max.poll.interval.ms" not in kwargs:
kwargs["max.poll.interval.ms"] = kwargs["session.timeout.ms"] // 2 * 3
consumer_settings = {
**kwargs,
"bootstrap.servers": ",".join(brokers),
"auto.offset.reset": auto_offset_reset,
"group.id": group_id,
"on_commit": _on_commit,
"error_cb": _error_cb,
"enable.auto.commit": False,
"logger": rdkafka_logger,
}
self.stop_on_eof = stop_on_eof
if self.stop_on_eof:
consumer_settings["enable.partition.eof"] = True
logger.debug("Consumer settings: %s", consumer_settings)
self.consumer = Consumer(consumer_settings)
if privileged:
privileged_prefix = f"{prefix}_privileged"
else: # do not attempt to subscribe to privileged topics
privileged_prefix = f"{prefix}"
existing_topics = [
topic
for topic in self.consumer.list_topics(timeout=10).topics.keys()
if (
topic.startswith(f"{prefix}.")
or topic.startswith(f"{privileged_prefix}.")
)
]
if not existing_topics:
raise ValueError(
f"The prefix {prefix} does not match any existing topic "
"on the kafka broker"
)
if not object_types:
object_types = list({topic.split(".")[-1] for topic in existing_topics})
self.subscription = []
unknown_types = []
for object_type in object_types:
topics = (f"{privileged_prefix}.{object_type}", f"{prefix}.{object_type}")
for topic in topics:
if topic in existing_topics:
self.subscription.append(topic)
break
else:
unknown_types.append(object_type)
if unknown_types:
raise ValueError(
f"Topic(s) for object types {','.join(unknown_types)} "
"are unknown on the kafka broker"
)
logger.debug(f"Upstream topics: {existing_topics}")
self.subscribe()
self.stop_after_objects = stop_after_objects
self.process_timeout = process_timeout
self.eof_reached: Set[Tuple[str, str]] = set()
self.batch_size = batch_size
def subscribe(self):
"""Subscribe to topics listed in self.subscription
This can be overridden if you need, for instance, to manually assign partitions.
"""
logger.debug(f"Subscribing to: {self.subscription}")
self.consumer.subscribe(topics=self.subscription)
def process(self, worker_fn):
"""Polls Kafka for a batch of messages, and calls the worker_fn
with these messages.
Args:
worker_fn Callable[Dict[str, List[dict]]]: Function called with
the messages as
argument.
"""
start_time = time.monotonic()
total_objects_processed = 0
while True:
# timeout for message poll
timeout = 1.0
elapsed = time.monotonic() - start_time
if self.process_timeout:
# +0.01 to prevent busy-waiting on / spamming consumer.poll.
# consumer.consume() returns shortly before X expired
# (a matter of milliseconds), so after it returns a first
# time, it would then be called with a timeout in the order
# of milliseconds, therefore returning immediately, then be
# called again, etc.
if elapsed + 0.01 >= self.process_timeout:
break
timeout = self.process_timeout - elapsed
batch_size = self.batch_size
if self.stop_after_objects:
if total_objects_processed >= self.stop_after_objects:
break
# clamp batch size to avoid overrunning stop_after_objects
batch_size = min(
self.stop_after_objects - total_objects_processed, batch_size,
)
messages = self.consumer.consume(timeout=timeout, num_messages=batch_size)
if not messages:
continue
batch_processed, at_eof = self.handle_messages(messages, worker_fn)
total_objects_processed += batch_processed
if at_eof:
break
return total_objects_processed
def handle_messages(self, messages, worker_fn):
objects: Dict[str, List[Any]] = defaultdict(list)
nb_processed = 0
for message in messages:
error = message.error()
if error is not None:
if error.code() == KafkaError._PARTITION_EOF:
self.eof_reached.add((message.topic(), message.partition()))
else:
_error_cb(error)
continue
if message.value() is None:
# ignore message with no payload, these can be generated in tests
continue
nb_processed += 1
object_type = message.topic().split(".")[-1]
objects[object_type].append(self.deserialize_message(message))
if objects:
worker_fn(dict(objects))
self.consumer.commit()
at_eof = self.stop_on_eof and all(
(tp.topic, tp.partition) in self.eof_reached
for tp in self.consumer.assignment()
)
return nb_processed, at_eof
def deserialize_message(self, message):
return self.value_deserializer(message.value())
def close(self):
self.consumer.close()
diff --git a/swh/journal/pytest_plugin.py b/swh/journal/pytest_plugin.py
index 778eeef..c1025e6 100644
--- a/swh/journal/pytest_plugin.py
+++ b/swh/journal/pytest_plugin.py
@@ -1,241 +1,239 @@
# Copyright (C) 2019-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
+from collections import defaultdict
import random
import string
-
from typing import Collection, Dict, Iterator, Optional
-from collections import defaultdict
import attr
-import pytest
-
from confluent_kafka import Consumer, KafkaException, Producer
from confluent_kafka.admin import AdminClient
+import pytest
-from swh.journal.serializers import object_key, kafka_to_key, kafka_to_value, pprint_key
+from swh.journal.serializers import kafka_to_key, kafka_to_value, object_key, pprint_key
from swh.journal.tests.journal_data import TEST_OBJECTS
def consume_messages(consumer, kafka_prefix, expected_messages):
"""Consume expected_messages from the consumer;
Sort them all into a consumed_objects dict"""
consumed_messages = defaultdict(list)
fetched_messages = 0
retries_left = 1000
while fetched_messages < expected_messages:
if retries_left == 0:
raise ValueError(
"Timed out fetching messages from kafka. "
f"Only {fetched_messages}/{expected_messages} fetched"
)
msg = consumer.poll(timeout=0.01)
if not msg:
retries_left -= 1
continue
error = msg.error()
if error is not None:
if error.fatal():
raise KafkaException(error)
retries_left -= 1
continue
fetched_messages += 1
topic = msg.topic()
assert topic.startswith(f"{kafka_prefix}.") or topic.startswith(
f"{kafka_prefix}_privileged."
), "Unexpected topic"
object_type = topic[len(kafka_prefix + ".") :]
consumed_messages[object_type].append(
(kafka_to_key(msg.key()), kafka_to_value(msg.value()))
)
return consumed_messages
def assert_all_objects_consumed(
consumed_messages: Dict, exclude: Optional[Collection] = None
):
"""Check whether all objects from TEST_OBJECTS have been consumed
`exclude` can be a list of object types for which we do not want to compare the
values (eg. for anonymized object).
"""
for object_type, known_objects in TEST_OBJECTS.items():
known_keys = [object_key(object_type, obj) for obj in known_objects]
if not consumed_messages[object_type]:
return
(received_keys, received_values) = zip(*consumed_messages[object_type])
if object_type in ("content", "skipped_content"):
for value in received_values:
value.pop("ctime", None)
if object_type == "content":
known_objects = [attr.evolve(o, data=None) for o in known_objects]
for key in known_keys:
assert key in received_keys, (
f"expected {object_type} key {pprint_key(key)} "
"absent from consumed messages"
)
if exclude and object_type in exclude:
continue
for value in known_objects:
expected_value = value.to_dict()
if value.object_type in ("content", "skipped_content"):
expected_value.pop("ctime", None)
assert expected_value in received_values, (
f"expected {object_type} value {value!r} is "
"absent from consumed messages"
)
@pytest.fixture(scope="function")
def kafka_prefix():
"""Pick a random prefix for kafka topics on each call"""
return "".join(random.choice(string.ascii_lowercase) for _ in range(10))
@pytest.fixture(scope="function")
def kafka_consumer_group(kafka_prefix: str):
"""Pick a random consumer group for kafka consumers on each call"""
return "test-consumer-%s" % kafka_prefix
@pytest.fixture(scope="function")
def object_types():
"""Set of object types to precreate topics for."""
return set(TEST_OBJECTS.keys())
@pytest.fixture(scope="function")
def privileged_object_types():
"""Set of object types to precreate privileged topics for."""
return {"revision", "release"}
@pytest.fixture(scope="function")
def kafka_server(
kafka_server_base: str,
kafka_prefix: str,
object_types: Iterator[str],
privileged_object_types: Iterator[str],
) -> str:
"""A kafka server with existing topics
Unprivileged topics are built as ``{kafka_prefix}.{object_type}`` with object_type
from the ``object_types`` list.
Privileged topics are built as ``{kafka_prefix}_privileged.{object_type}`` with
object_type from the ``privileged_object_types`` list.
"""
topics = [f"{kafka_prefix}.{obj}" for obj in object_types] + [
f"{kafka_prefix}_privileged.{obj}" for obj in privileged_object_types
]
# unfortunately, the Mock broker does not support the CreatTopic admin API, so we
# have to create topics using a Producer.
producer = Producer(
{
"bootstrap.servers": kafka_server_base,
"client.id": "bootstrap producer",
"acks": "all",
}
)
for topic in topics:
producer.produce(topic=topic, value=None)
for i in range(10):
if producer.flush(0.1) == 0:
break
return kafka_server_base
@pytest.fixture(scope="session")
def kafka_server_base() -> Iterator[str]:
"""Create a mock kafka cluster suitable for tests.
Yield a connection string.
Note: this is a generator to keep the mock broker alive during the whole test
session.
see https://github.com/edenhill/librdkafka/blob/master/src/rdkafka_mock.h
"""
admin = AdminClient({"test.mock.num.brokers": "1"})
metadata = admin.list_topics()
brokers = [str(broker) for broker in metadata.brokers.values()]
assert len(brokers) == 1, "More than one broker found in the kafka cluster?!"
broker_connstr, broker_id = brokers[0].split("/")
yield broker_connstr
TEST_CONFIG = {
"consumer_id": "swh.journal.consumer",
"stop_after_objects": 1, # will read 1 object and stop
"storage": {"cls": "memory", "args": {}},
}
@pytest.fixture
def test_config(
kafka_server_base: str,
kafka_prefix: str,
object_types: Iterator[str],
privileged_object_types: Iterator[str],
):
"""Test configuration needed for producer/consumer
"""
return {
**TEST_CONFIG,
"object_types": object_types,
"privileged_object_types": privileged_object_types,
"brokers": [kafka_server_base],
"prefix": kafka_prefix,
}
@pytest.fixture
def consumer(
kafka_server: str, test_config: Dict, kafka_consumer_group: str
) -> Consumer:
"""Get a connected Kafka consumer.
"""
consumer = Consumer(
{
"bootstrap.servers": kafka_server,
"auto.offset.reset": "earliest",
"enable.auto.commit": True,
"group.id": kafka_consumer_group,
}
)
prefix = test_config["prefix"]
kafka_topics = [
f"{prefix}.{object_type}" for object_type in test_config["object_types"]
] + [
f"{prefix}_privileged.{object_type}"
for object_type in test_config["privileged_object_types"]
]
consumer.subscribe(kafka_topics)
yield consumer
consumer.close()
diff --git a/swh/journal/tests/conftest.py b/swh/journal/tests/conftest.py
index ec2a3cf..39670ed 100644
--- a/swh/journal/tests/conftest.py
+++ b/swh/journal/tests/conftest.py
@@ -1,28 +1,27 @@
# Copyright (C) 2019-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import logging
from hypothesis.strategies import one_of
-from swh.model import hypothesis_strategies as strategies
-
# for bw compat
from swh.journal.tests.journal_data import * # noqa
+from swh.model import hypothesis_strategies as strategies
logger = logging.getLogger(__name__)
def objects_d():
return one_of(
strategies.origins().map(lambda x: ("origin", x.to_dict())),
strategies.origin_visits().map(lambda x: ("origin_visit", x.to_dict())),
strategies.snapshots().map(lambda x: ("snapshot", x.to_dict())),
strategies.releases().map(lambda x: ("release", x.to_dict())),
strategies.revisions().map(lambda x: ("revision", x.to_dict())),
strategies.directories().map(lambda x: ("directory", x.to_dict())),
strategies.skipped_contents().map(lambda x: ("skipped_content", x.to_dict())),
strategies.present_contents().map(lambda x: ("content", x.to_dict())),
)
diff --git a/swh/journal/tests/journal_data.py b/swh/journal/tests/journal_data.py
index 336f5ac..f891293 100644
--- a/swh/journal/tests/journal_data.py
+++ b/swh/journal/tests/journal_data.py
@@ -1,343 +1,341 @@
# Copyright (C) 2019-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import datetime
-
from typing import Dict, Sequence
import attr
-from swh.model.hashutil import MultiHash, hash_to_bytes, hash_to_hex
from swh.journal.serializers import ModelObject
-
+from swh.model.hashutil import MultiHash, hash_to_bytes, hash_to_hex
from swh.model.identifiers import SWHID
from swh.model.model import (
Content,
Directory,
DirectoryEntry,
MetadataAuthority,
MetadataAuthorityType,
MetadataFetcher,
MetadataTargetType,
ObjectType,
Origin,
OriginVisit,
OriginVisitStatus,
Person,
RawExtrinsicMetadata,
Release,
Revision,
RevisionType,
SkippedContent,
Snapshot,
SnapshotBranch,
TargetType,
Timestamp,
TimestampWithTimezone,
)
UTC = datetime.timezone.utc
CONTENTS = [
Content(
length=4,
data=f"foo{i}".encode(),
status="visible",
**MultiHash.from_data(f"foo{i}".encode()).digest(),
)
for i in range(10)
] + [
Content(
length=14,
data=f"forbidden foo{i}".encode(),
status="hidden",
**MultiHash.from_data(f"forbidden foo{i}".encode()).digest(),
)
for i in range(10)
]
SKIPPED_CONTENTS = [
SkippedContent(
length=4,
status="absent",
reason=f"because chr({i}) != '*'",
**MultiHash.from_data(f"bar{i}".encode()).digest(),
)
for i in range(2)
]
duplicate_content1 = Content(
length=4,
sha1=hash_to_bytes("44973274ccef6ab4dfaaf86599792fa9c3fe4689"),
sha1_git=b"another-foo",
blake2s256=b"another-bar",
sha256=b"another-baz",
status="visible",
)
# Craft a sha1 collision
sha1_array = bytearray(duplicate_content1.sha1_git)
sha1_array[0] += 1
duplicate_content2 = attr.evolve(duplicate_content1, sha1_git=bytes(sha1_array))
DUPLICATE_CONTENTS = [duplicate_content1, duplicate_content2]
COMMITTERS = [
Person(fullname=b"foo", name=b"foo", email=b""),
Person(fullname=b"bar", name=b"bar", email=b""),
]
DATES = [
TimestampWithTimezone(
timestamp=Timestamp(seconds=1234567891, microseconds=0,),
offset=120,
negative_utc=False,
),
TimestampWithTimezone(
timestamp=Timestamp(seconds=1234567892, microseconds=0,),
offset=120,
negative_utc=False,
),
]
REVISIONS = [
Revision(
id=hash_to_bytes("4ca486e65eb68e4986aeef8227d2db1d56ce51b3"),
message=b"hello",
date=DATES[0],
committer=COMMITTERS[0],
author=COMMITTERS[0],
committer_date=DATES[0],
type=RevisionType.GIT,
directory=b"\x01" * 20,
synthetic=False,
metadata=None,
parents=(),
),
Revision(
id=hash_to_bytes("677063f5c405d6fc1781fc56379c9a9adf43d3a0"),
message=b"hello again",
date=DATES[1],
committer=COMMITTERS[1],
author=COMMITTERS[1],
committer_date=DATES[1],
type=RevisionType.MERCURIAL,
directory=b"\x02" * 20,
synthetic=False,
metadata=None,
parents=(),
extra_headers=((b"foo", b"bar"),),
),
]
RELEASES = [
Release(
id=hash_to_bytes("8059dc4e17fcd0e51ca3bcd6b80f4577d281fd08"),
name=b"v0.0.1",
date=TimestampWithTimezone(
timestamp=Timestamp(seconds=1234567890, microseconds=0,),
offset=120,
negative_utc=False,
),
author=COMMITTERS[0],
target_type=ObjectType.REVISION,
target=b"\x04" * 20,
message=b"foo",
synthetic=False,
),
]
ORIGINS = [
Origin(url="https://somewhere.org/den/fox",),
Origin(url="https://overtherainbow.org/fox/den",),
]
ORIGIN_VISITS = [
OriginVisit(
origin=ORIGINS[0].url,
date=datetime.datetime(2013, 5, 7, 4, 20, 39, 369271, tzinfo=UTC),
visit=1,
type="git",
),
OriginVisit(
origin=ORIGINS[1].url,
date=datetime.datetime(2014, 11, 27, 17, 20, 39, tzinfo=UTC),
visit=1,
type="hg",
),
OriginVisit(
origin=ORIGINS[0].url,
date=datetime.datetime(2018, 11, 27, 17, 20, 39, tzinfo=UTC),
visit=2,
type="git",
),
OriginVisit(
origin=ORIGINS[0].url,
date=datetime.datetime(2018, 11, 27, 17, 20, 39, tzinfo=UTC),
visit=3,
type="git",
),
OriginVisit(
origin=ORIGINS[1].url,
date=datetime.datetime(2015, 11, 27, 17, 20, 39, tzinfo=UTC),
visit=2,
type="hg",
),
]
# The origin-visit-status dates needs to be shifted slightly in the future from their
# visit dates counterpart. Otherwise, we are hitting storage-wise the "on conflict"
# ignore policy (because origin-visit-add creates an origin-visit-status with the same
# parameters from the origin-visit {origin, visit, date}...
ORIGIN_VISIT_STATUSES = [
OriginVisitStatus(
origin=ORIGINS[0].url,
date=datetime.datetime(2013, 5, 7, 4, 20, 39, 432222, tzinfo=UTC),
visit=1,
status="ongoing",
snapshot=None,
metadata=None,
),
OriginVisitStatus(
origin=ORIGINS[1].url,
date=datetime.datetime(2014, 11, 27, 17, 21, 12, tzinfo=UTC),
visit=1,
status="ongoing",
snapshot=None,
metadata=None,
),
OriginVisitStatus(
origin=ORIGINS[0].url,
date=datetime.datetime(2018, 11, 27, 17, 20, 59, tzinfo=UTC),
visit=2,
status="ongoing",
snapshot=None,
metadata=None,
),
OriginVisitStatus(
origin=ORIGINS[0].url,
date=datetime.datetime(2018, 11, 27, 17, 20, 49, tzinfo=UTC),
visit=3,
status="full",
snapshot=hash_to_bytes("17d0066a4a80aba4a0e913532ee8ff2014f006a9"),
metadata=None,
),
OriginVisitStatus(
origin=ORIGINS[1].url,
date=datetime.datetime(2015, 11, 27, 17, 22, 18, tzinfo=UTC),
visit=2,
status="partial",
snapshot=hash_to_bytes("8ce268b87faf03850693673c3eb5c9bb66e1ca38"),
metadata=None,
),
]
DIRECTORIES = [
Directory(id=hash_to_bytes("4b825dc642cb6eb9a060e54bf8d69288fbee4904"), entries=()),
Directory(
id=hash_to_bytes("21416d920e0ebf0df4a7888bed432873ed5cb3a7"),
entries=(
DirectoryEntry(
name=b"file1.ext",
perms=0o644,
type="file",
target=CONTENTS[0].sha1_git,
),
DirectoryEntry(
name=b"dir1",
perms=0o755,
type="dir",
target=hash_to_bytes("4b825dc642cb6eb9a060e54bf8d69288fbee4904"),
),
DirectoryEntry(
name=b"subprepo1", perms=0o160000, type="rev", target=REVISIONS[1].id,
),
),
),
]
SNAPSHOTS = [
Snapshot(
id=hash_to_bytes("17d0066a4a80aba4a0e913532ee8ff2014f006a9"),
branches={
b"master": SnapshotBranch(
target_type=TargetType.REVISION, target=REVISIONS[0].id
)
},
),
Snapshot(
id=hash_to_bytes("8ce268b87faf03850693673c3eb5c9bb66e1ca38"),
branches={
b"target/revision": SnapshotBranch(
target_type=TargetType.REVISION, target=REVISIONS[0].id,
),
b"target/alias": SnapshotBranch(
target_type=TargetType.ALIAS, target=b"target/revision"
),
b"target/directory": SnapshotBranch(
target_type=TargetType.DIRECTORY, target=DIRECTORIES[0].id,
),
b"target/release": SnapshotBranch(
target_type=TargetType.RELEASE, target=RELEASES[0].id
),
b"target/snapshot": SnapshotBranch(
target_type=TargetType.SNAPSHOT,
target=hash_to_bytes("17d0066a4a80aba4a0e913532ee8ff2014f006a9"),
),
},
),
]
METADATA_AUTHORITIES = [
MetadataAuthority(
type=MetadataAuthorityType.FORGE, url="http://example.org/", metadata={},
),
]
METADATA_FETCHERS = [
MetadataFetcher(name="test-fetcher", version="1.0.0", metadata={},)
]
RAW_EXTRINSIC_METADATA = [
RawExtrinsicMetadata(
type=MetadataTargetType.ORIGIN,
id="http://example.org/foo.git",
discovery_date=datetime.datetime(2020, 7, 30, 17, 8, 20, tzinfo=UTC),
authority=attr.evolve(METADATA_AUTHORITIES[0], metadata=None),
fetcher=attr.evolve(METADATA_FETCHERS[0], metadata=None),
format="json",
metadata=b'{"foo": "bar"}',
),
RawExtrinsicMetadata(
type=MetadataTargetType.CONTENT,
id=SWHID(object_type="content", object_id=hash_to_hex(CONTENTS[0].sha1_git)),
discovery_date=datetime.datetime(2020, 7, 30, 17, 8, 20, tzinfo=UTC),
authority=attr.evolve(METADATA_AUTHORITIES[0], metadata=None),
fetcher=attr.evolve(METADATA_FETCHERS[0], metadata=None),
format="json",
metadata=b'{"foo": "bar"}',
),
]
TEST_OBJECTS: Dict[str, Sequence[ModelObject]] = {
"content": CONTENTS,
"directory": DIRECTORIES,
"metadata_authority": METADATA_AUTHORITIES,
"metadata_fetcher": METADATA_FETCHERS,
"origin": ORIGINS,
"origin_visit": ORIGIN_VISITS,
"origin_visit_status": ORIGIN_VISIT_STATUSES,
"raw_extrinsic_metadata": RAW_EXTRINSIC_METADATA,
"release": RELEASES,
"revision": REVISIONS,
"snapshot": SNAPSHOTS,
"skipped_content": SKIPPED_CONTENTS,
}
diff --git a/swh/journal/tests/test_client.py b/swh/journal/tests/test_client.py
index 66278b8..3984c46 100644
--- a/swh/journal/tests/test_client.py
+++ b/swh/journal/tests/test_client.py
@@ -1,330 +1,329 @@
# Copyright (C) 2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from typing import Dict, List
from unittest.mock import MagicMock
from confluent_kafka import Producer
import pytest
-from swh.model.model import Content
-
from swh.journal.client import JournalClient
from swh.journal.serializers import key_to_kafka, value_to_kafka
+from swh.model.model import Content
REV = {
"message": b"something cool",
"author": {"fullname": b"Peter", "name": None, "email": b"peter@ouiche.lo"},
"committer": {"fullname": b"Stephen", "name": b"From Outer Space", "email": None},
"date": {
"timestamp": {"seconds": 123456789, "microseconds": 123},
"offset": 120,
"negative_utc": False,
},
"committer_date": {
"timestamp": {"seconds": 123123456, "microseconds": 0},
"offset": 0,
"negative_utc": False,
},
"type": "git",
"directory": (
b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
b"\x01\x02\x03\x04\x05"
),
"synthetic": False,
"metadata": None,
"parents": (),
"id": b"\x8b\xeb\xd1\x9d\x07\xe2\x1e0\xe2 \x91X\x8d\xbd\x1c\xa8\x86\xdeB\x0c",
}
def test_client(kafka_prefix: str, kafka_consumer_group: str, kafka_server: str):
producer = Producer(
{
"bootstrap.servers": kafka_server,
"client.id": "test producer",
"acks": "all",
}
)
# Fill Kafka
producer.produce(
topic=kafka_prefix + ".revision", key=REV["id"], value=value_to_kafka(REV),
)
producer.flush()
client = JournalClient(
brokers=[kafka_server],
group_id=kafka_consumer_group,
prefix=kafka_prefix,
stop_after_objects=1,
)
worker_fn = MagicMock()
client.process(worker_fn)
worker_fn.assert_called_once_with({"revision": [REV]})
def test_client_eof(kafka_prefix: str, kafka_consumer_group: str, kafka_server: str):
producer = Producer(
{
"bootstrap.servers": kafka_server,
"client.id": "test producer",
"acks": "all",
}
)
# Fill Kafka
producer.produce(
topic=kafka_prefix + ".revision", key=REV["id"], value=value_to_kafka(REV),
)
producer.flush()
client = JournalClient(
brokers=[kafka_server],
group_id=kafka_consumer_group,
prefix=kafka_prefix,
stop_after_objects=None,
stop_on_eof=True,
)
worker_fn = MagicMock()
client.process(worker_fn)
worker_fn.assert_called_once_with({"revision": [REV]})
@pytest.mark.parametrize("batch_size", [1, 5, 100])
def test_client_batch_size(
kafka_prefix: str, kafka_consumer_group: str, kafka_server: str, batch_size: int,
):
num_objects = 2 * batch_size + 1
assert num_objects < 256, "Too many objects, generation will fail"
producer = Producer(
{
"bootstrap.servers": kafka_server,
"client.id": "test producer",
"acks": "all",
}
)
contents = [Content.from_data(bytes([i])) for i in range(num_objects)]
# Fill Kafka
for content in contents:
producer.produce(
topic=kafka_prefix + ".content",
key=key_to_kafka(content.sha1),
value=value_to_kafka(content.to_dict()),
)
producer.flush()
client = JournalClient(
brokers=[kafka_server],
group_id=kafka_consumer_group,
prefix=kafka_prefix,
stop_after_objects=num_objects,
batch_size=batch_size,
)
collected_output: List[Dict] = []
def worker_fn(objects):
received = objects["content"]
assert len(received) <= batch_size
collected_output.extend(received)
client.process(worker_fn)
expected_output = [content.to_dict() for content in contents]
assert len(collected_output) == len(expected_output)
for output in collected_output:
assert output in expected_output
@pytest.fixture()
def kafka_producer(kafka_prefix: str, kafka_server_base: str):
producer = Producer(
{
"bootstrap.servers": kafka_server_base,
"client.id": "test producer",
"acks": "all",
}
)
# Fill Kafka
producer.produce(
topic=kafka_prefix + ".something",
key=key_to_kafka(b"key1"),
value=value_to_kafka("value1"),
)
producer.produce(
topic=kafka_prefix + ".else",
key=key_to_kafka(b"key1"),
value=value_to_kafka("value2"),
)
producer.flush()
return producer
def test_client_subscribe_all(
kafka_producer: Producer, kafka_prefix: str, kafka_server_base: str
):
client = JournalClient(
brokers=[kafka_server_base],
group_id="whatever",
prefix=kafka_prefix,
stop_after_objects=2,
)
assert set(client.subscription) == {
f"{kafka_prefix}.something",
f"{kafka_prefix}.else",
}
worker_fn = MagicMock()
client.process(worker_fn)
worker_fn.assert_called_once_with(
{"something": ["value1"], "else": ["value2"],}
)
def test_client_subscribe_one_topic(
kafka_producer: Producer, kafka_prefix: str, kafka_server_base: str
):
client = JournalClient(
brokers=[kafka_server_base],
group_id="whatever",
prefix=kafka_prefix,
stop_after_objects=1,
object_types=["else"],
)
assert client.subscription == [f"{kafka_prefix}.else"]
worker_fn = MagicMock()
client.process(worker_fn)
worker_fn.assert_called_once_with({"else": ["value2"]})
def test_client_subscribe_absent_topic(
kafka_producer: Producer, kafka_prefix: str, kafka_server_base: str
):
with pytest.raises(ValueError):
JournalClient(
brokers=[kafka_server_base],
group_id="whatever",
prefix=kafka_prefix,
stop_after_objects=1,
object_types=["really"],
)
def test_client_subscribe_absent_prefix(
kafka_producer: Producer, kafka_prefix: str, kafka_server_base: str
):
with pytest.raises(ValueError):
JournalClient(
brokers=[kafka_server_base],
group_id="whatever",
prefix="wrong.prefix",
stop_after_objects=1,
)
with pytest.raises(ValueError):
JournalClient(
brokers=[kafka_server_base],
group_id="whatever",
prefix="wrong.prefix",
stop_after_objects=1,
object_types=["else"],
)
def test_client_subscriptions_with_anonymized_topics(
kafka_prefix: str, kafka_consumer_group: str, kafka_server_base: str
):
producer = Producer(
{
"bootstrap.servers": kafka_server_base,
"client.id": "test producer",
"acks": "all",
}
)
# Fill Kafka with revision object on both the regular prefix (normally for
# anonymized objects in this case) and privileged one
producer.produce(
topic=kafka_prefix + ".revision", key=REV["id"], value=value_to_kafka(REV),
)
producer.produce(
topic=kafka_prefix + "_privileged.revision",
key=REV["id"],
value=value_to_kafka(REV),
)
producer.flush()
# without privileged "channels" activated on the client side
client = JournalClient(
brokers=[kafka_server_base],
group_id=kafka_consumer_group,
prefix=kafka_prefix,
stop_after_objects=1,
privileged=False,
)
# we only subscribed to "standard" topics
assert client.subscription == [kafka_prefix + ".revision"]
# with privileged "channels" activated on the client side
client = JournalClient(
brokers=[kafka_server_base],
group_id=kafka_consumer_group,
prefix=kafka_prefix,
stop_after_objects=1,
privileged=True,
)
# we only subscribed to "privileged" topics
assert client.subscription == [kafka_prefix + "_privileged.revision"]
def test_client_subscriptions_without_anonymized_topics(
kafka_prefix: str, kafka_consumer_group: str, kafka_server_base: str
):
producer = Producer(
{
"bootstrap.servers": kafka_server_base,
"client.id": "test producer",
"acks": "all",
}
)
# Fill Kafka with revision objects only on the standard prefix
producer.produce(
topic=kafka_prefix + ".revision", key=REV["id"], value=value_to_kafka(REV),
)
producer.flush()
# without privileged channel activated on the client side
client = JournalClient(
brokers=[kafka_server_base],
group_id=kafka_consumer_group,
prefix=kafka_prefix,
stop_after_objects=1,
privileged=False,
)
# we only subscribed to the standard prefix
assert client.subscription == [kafka_prefix + ".revision"]
# with privileged channel activated on the client side
client = JournalClient(
brokers=[kafka_server_base],
group_id=kafka_consumer_group,
prefix=kafka_prefix,
stop_after_objects=1,
privileged=True,
)
# we also only subscribed to the standard prefix, since there is no priviled prefix
# on the kafka broker
assert client.subscription == [kafka_prefix + ".revision"]
diff --git a/swh/journal/tests/test_kafka_writer.py b/swh/journal/tests/test_kafka_writer.py
index 1fe02cc..d09d43a 100644
--- a/swh/journal/tests/test_kafka_writer.py
+++ b/swh/journal/tests/test_kafka_writer.py
@@ -1,166 +1,165 @@
# Copyright (C) 2018-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from typing import Iterable
-import pytest
from confluent_kafka import Consumer, Producer
+import pytest
-from swh.model.model import Directory, Revision, Release
-
+from swh.journal.pytest_plugin import assert_all_objects_consumed, consume_messages
from swh.journal.tests.journal_data import TEST_OBJECTS
-from swh.journal.pytest_plugin import consume_messages, assert_all_objects_consumed
-from swh.journal.writer.kafka import KafkaJournalWriter, KafkaDeliveryError
+from swh.journal.writer.kafka import KafkaDeliveryError, KafkaJournalWriter
+from swh.model.model import Directory, Release, Revision
def test_kafka_writer(
kafka_prefix: str,
kafka_server: str,
consumer: Consumer,
privileged_object_types: Iterable[str],
):
writer = KafkaJournalWriter(
brokers=[kafka_server],
client_id="kafka_writer",
prefix=kafka_prefix,
anonymize=False,
)
expected_messages = 0
for object_type, objects in TEST_OBJECTS.items():
writer.write_additions(object_type, objects)
expected_messages += len(objects)
consumed_messages = consume_messages(consumer, kafka_prefix, expected_messages)
assert_all_objects_consumed(consumed_messages)
for key, obj_dict in consumed_messages["revision"]:
obj = Revision.from_dict(obj_dict)
for person in (obj.author, obj.committer):
assert not (
len(person.fullname) == 32
and person.name is None
and person.email is None
)
for key, obj_dict in consumed_messages["release"]:
obj = Release.from_dict(obj_dict)
for person in (obj.author,):
assert not (
len(person.fullname) == 32
and person.name is None
and person.email is None
)
def test_kafka_writer_anonymized(
kafka_prefix: str,
kafka_server: str,
consumer: Consumer,
privileged_object_types: Iterable[str],
):
writer = KafkaJournalWriter(
brokers=[kafka_server],
client_id="kafka_writer",
prefix=kafka_prefix,
anonymize=True,
)
expected_messages = 0
for object_type, objects in TEST_OBJECTS.items():
writer.write_additions(object_type, objects)
expected_messages += len(objects)
if object_type in privileged_object_types:
expected_messages += len(objects)
consumed_messages = consume_messages(consumer, kafka_prefix, expected_messages)
assert_all_objects_consumed(consumed_messages, exclude=["revision", "release"])
for key, obj_dict in consumed_messages["revision"]:
obj = Revision.from_dict(obj_dict)
for person in (obj.author, obj.committer):
assert (
len(person.fullname) == 32
and person.name is None
and person.email is None
)
for key, obj_dict in consumed_messages["release"]:
obj = Release.from_dict(obj_dict)
for person in (obj.author,):
assert (
len(person.fullname) == 32
and person.name is None
and person.email is None
)
def test_write_delivery_failure(
kafka_prefix: str, kafka_server: str, consumer: Consumer
):
class MockKafkaError:
"""A mocked kafka error"""
def str(self):
return "Mocked Kafka Error"
def name(self):
return "SWH_MOCK_ERROR"
class KafkaJournalWriterFailDelivery(KafkaJournalWriter):
"""A journal writer which always fails delivering messages"""
def _on_delivery(self, error, message):
"""Replace the inbound error with a fake delivery error"""
super()._on_delivery(MockKafkaError(), message)
kafka_prefix += ".swh.journal.objects"
writer = KafkaJournalWriterFailDelivery(
brokers=[kafka_server], client_id="kafka_writer", prefix=kafka_prefix,
)
empty_dir = Directory(entries=())
with pytest.raises(KafkaDeliveryError) as exc:
writer.write_addition("directory", empty_dir)
assert "Failed deliveries" in exc.value.message
assert len(exc.value.delivery_failures) == 1
delivery_failure = exc.value.delivery_failures[0]
assert delivery_failure.key == empty_dir.id
assert delivery_failure.code == "SWH_MOCK_ERROR"
def test_write_delivery_timeout(
kafka_prefix: str, kafka_server: str, consumer: Consumer
):
produced = []
class MockProducer(Producer):
"""A kafka producer which pretends to produce messages, but never sends any
delivery acknowledgements"""
def produce(self, **kwargs):
produced.append(kwargs)
writer = KafkaJournalWriter(
brokers=[kafka_server],
client_id="kafka_writer",
prefix=kafka_prefix,
flush_timeout=1,
producer_class=MockProducer,
)
empty_dir = Directory(entries=())
with pytest.raises(KafkaDeliveryError) as exc:
writer.write_addition("directory", empty_dir)
assert len(produced) == 1
assert "timeout" in exc.value.message
assert len(exc.value.delivery_failures) == 1
delivery_failure = exc.value.delivery_failures[0]
assert delivery_failure.key == empty_dir.id
assert delivery_failure.code == "SWH_FLUSH_TIMEOUT"
diff --git a/swh/journal/tests/test_pytest_plugin.py b/swh/journal/tests/test_pytest_plugin.py
index 14e49d7..02c0ef7 100644
--- a/swh/journal/tests/test_pytest_plugin.py
+++ b/swh/journal/tests/test_pytest_plugin.py
@@ -1,71 +1,72 @@
# Copyright (C) 2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from typing import Iterator
+
from confluent_kafka.admin import AdminClient
def test_kafka_server(kafka_server_base: str):
ip, port_str = kafka_server_base.split(":")
assert ip == "127.0.0.1"
assert int(port_str)
admin = AdminClient({"bootstrap.servers": kafka_server_base})
topics = admin.list_topics()
assert len(topics.brokers) == 1
def test_kafka_server_with_topics(
kafka_server: str,
kafka_prefix: str,
object_types: Iterator[str],
privileged_object_types: Iterator[str],
):
admin = AdminClient({"bootstrap.servers": kafka_server})
# check unprivileged topics are present
topics = {
topic
for topic in admin.list_topics().topics
if topic.startswith(f"{kafka_prefix}.")
}
assert topics == {f"{kafka_prefix}.{obj}" for obj in object_types}
# check privileged topics are present
topics = {
topic
for topic in admin.list_topics().topics
if topic.startswith(f"{kafka_prefix}_privileged.")
}
assert topics == {
f"{kafka_prefix}_privileged.{obj}" for obj in privileged_object_types
}
def test_test_config(test_config: dict, kafka_prefix: str, kafka_server_base: str):
assert test_config == {
"consumer_id": "swh.journal.consumer",
"stop_after_objects": 1,
"storage": {"cls": "memory", "args": {}},
"object_types": {
"content",
"directory",
"metadata_authority",
"metadata_fetcher",
"origin",
"origin_visit",
"origin_visit_status",
"raw_extrinsic_metadata",
"release",
"revision",
"snapshot",
"skipped_content",
},
"privileged_object_types": {"release", "revision",},
"brokers": [kafka_server_base],
"prefix": kafka_prefix,
}
diff --git a/swh/journal/tests/test_serializers.py b/swh/journal/tests/test_serializers.py
index 1a1fdd9..4cbe236 100644
--- a/swh/journal/tests/test_serializers.py
+++ b/swh/journal/tests/test_serializers.py
@@ -1,77 +1,76 @@
# Copyright (C) 2017-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-import itertools
-
from collections import OrderedDict
+import itertools
from typing import Iterable
from swh.journal import serializers
from .conftest import TEST_OBJECTS
def test_key_to_kafka_repeatable():
"""Check the kafka key encoding is repeatable"""
base_dict = {
"a": "foo",
"b": "bar",
"c": "baz",
}
key = serializers.key_to_kafka(base_dict)
for dict_keys in itertools.permutations(base_dict):
d = OrderedDict()
for k in dict_keys:
d[k] = base_dict[k]
assert key == serializers.key_to_kafka(d)
def test_get_key():
"""Test whether get_key works on all our objects"""
for object_type, objects in TEST_OBJECTS.items():
for obj in objects:
assert serializers.object_key(object_type, obj) is not None
def test_pprint_key():
"""Test whether get_key works on all our objects"""
for object_type, objects in TEST_OBJECTS.items():
for obj in objects:
key = serializers.object_key(object_type, obj)
pprinted_key = serializers.pprint_key(key)
assert isinstance(pprinted_key, str)
if isinstance(key, dict):
assert pprinted_key[0], pprinted_key[-1] == "{}"
for dict_key in key.keys():
assert f"{dict_key}:" in pprinted_key
if isinstance(key, bytes):
assert pprinted_key == key.hex()
def test_kafka_to_key():
"""Standard back and forth serialization with keys
"""
# All KeyType(s)
keys: Iterable[serializers.KeyType] = [
{"a": "foo", "b": "bar", "c": "baz",},
{"a": b"foobarbaz",},
b"foo",
]
for object_type, objects in TEST_OBJECTS.items():
for obj in objects:
key = serializers.object_key(object_type, obj)
keys.append(key)
for key in keys:
ktk = serializers.key_to_kafka(key)
v = serializers.kafka_to_key(ktk)
assert v == key
diff --git a/swh/journal/writer/inmemory.py b/swh/journal/writer/inmemory.py
index 61a9269..9c81b6a 100644
--- a/swh/journal/writer/inmemory.py
+++ b/swh/journal/writer/inmemory.py
@@ -1,40 +1,39 @@
# Copyright (C) 2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import logging
-
from multiprocessing import Manager
from typing import List
from swh.model.model import BaseModel
from .kafka import ModelObject
logger = logging.getLogger(__name__)
class InMemoryJournalWriter:
def __init__(self):
# Share the list of objects across processes, for RemoteAPI tests.
self.manager = Manager()
self.objects = self.manager.list()
self.privileged_objects = self.manager.list()
def write_addition(
self, object_type: str, object_: ModelObject, privileged: bool = False
) -> None:
assert isinstance(object_, BaseModel)
if privileged:
self.privileged_objects.append((object_type, object_))
else:
self.objects.append((object_type, object_))
write_update = write_addition
def write_additions(
self, object_type: str, objects: List[ModelObject], privileged: bool = False
) -> None:
for object_ in objects:
self.write_addition(object_type, object_, privileged)
diff --git a/swh/journal/writer/kafka.py b/swh/journal/writer/kafka.py
index 1dccdcb..e5dc547 100644
--- a/swh/journal/writer/kafka.py
+++ b/swh/journal/writer/kafka.py
@@ -1,239 +1,239 @@
# Copyright (C) 2019-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import logging
import time
from typing import Dict, Iterable, List, NamedTuple, Optional, Type
-from confluent_kafka import Producer, KafkaException
+from confluent_kafka import KafkaException, Producer
from swh.journal.serializers import (
KeyType,
ModelObject,
+ key_to_kafka,
object_key,
pprint_key,
- key_to_kafka,
value_to_kafka,
)
logger = logging.getLogger(__name__)
class DeliveryTag(NamedTuple):
"""Unique tag allowing us to check for a message delivery"""
topic: str
kafka_key: bytes
class DeliveryFailureInfo(NamedTuple):
"""Verbose information for failed deliveries"""
object_type: str
key: KeyType
message: str
code: str
def get_object_type(topic: str) -> str:
"""Get the object type from a topic string"""
return topic.rsplit(".", 1)[-1]
class KafkaDeliveryError(Exception):
"""Delivery failed on some kafka messages."""
def __init__(self, message: str, delivery_failures: Iterable[DeliveryFailureInfo]):
self.message = message
self.delivery_failures = list(delivery_failures)
def pretty_failures(self) -> str:
return ", ".join(
f"{f.object_type} {pprint_key(f.key)} ({f.message})"
for f in self.delivery_failures
)
def __str__(self):
return f"KafkaDeliveryError({self.message}, [{self.pretty_failures()}])"
class KafkaJournalWriter:
"""This class is used to write serialized versions of swh.model.model objects to a
series of Kafka topics.
Topics used to send objects representations are built from a ``prefix`` plus the
type of the object:
``{prefix}.{object_type}``
Objects can be sent as is, or can be anonymized. The anonymization feature, when
activated, will write anonymized versions of model objects in the main topic, and
stock (non-anonymized) objects will be sent to a dedicated (privileged) set of
topics:
``{prefix}_privileged.{object_type}``
The anonymization of a swh.model object is the result of calling its
``BaseModel.anonymize()`` method. An object is considered anonymizable if this
method returns a (non-None) value.
Args:
brokers: list of broker addresses and ports.
prefix: the prefix used to build the topic names for objects.
client_id: the id of the writer sent to kafka.
producer_config: extra configuration keys passed to the `Producer`.
flush_timeout: timeout, in seconds, after which the `flush` operation
will fail if some message deliveries are still pending.
producer_class: override for the kafka producer class.
anonymize: if True, activate the anonymization feature.
"""
def __init__(
self,
brokers: Iterable[str],
prefix: str,
client_id: str,
producer_config: Optional[Dict] = None,
flush_timeout: float = 120,
producer_class: Type[Producer] = Producer,
anonymize: bool = False,
):
self._prefix = prefix
self._prefix_privileged = f"{self._prefix}_privileged"
self.anonymize = anonymize
if not producer_config:
producer_config = {}
if "message.max.bytes" not in producer_config:
producer_config = {
"message.max.bytes": 100 * 1024 * 1024,
**producer_config,
}
self.producer = producer_class(
{
"bootstrap.servers": ",".join(brokers),
"client.id": client_id,
"on_delivery": self._on_delivery,
"error_cb": self._error_cb,
"logger": logger,
"acks": "all",
**producer_config,
}
)
# Delivery management
self.flush_timeout = flush_timeout
# delivery tag -> original object "key" mapping
self.deliveries_pending: Dict[DeliveryTag, KeyType] = {}
# List of (object_type, key, error_msg, error_name) for failed deliveries
self.delivery_failures: List[DeliveryFailureInfo] = []
def _error_cb(self, error):
if error.fatal():
raise KafkaException(error)
logger.info("Received non-fatal kafka error: %s", error)
def _on_delivery(self, error, message):
(topic, key) = delivery_tag = DeliveryTag(message.topic(), message.key())
sent_key = self.deliveries_pending.pop(delivery_tag, None)
if error is not None:
self.delivery_failures.append(
DeliveryFailureInfo(
get_object_type(topic), sent_key, error.str(), error.name()
)
)
def send(self, topic: str, key: KeyType, value):
kafka_key = key_to_kafka(key)
self.producer.produce(
topic=topic, key=kafka_key, value=value_to_kafka(value),
)
self.deliveries_pending[DeliveryTag(topic, kafka_key)] = key
def delivery_error(self, message) -> KafkaDeliveryError:
"""Get all failed deliveries, and clear them"""
ret = self.delivery_failures
self.delivery_failures = []
while self.deliveries_pending:
delivery_tag, orig_key = self.deliveries_pending.popitem()
(topic, kafka_key) = delivery_tag
ret.append(
DeliveryFailureInfo(
get_object_type(topic),
orig_key,
"No delivery before flush() timeout",
"SWH_FLUSH_TIMEOUT",
)
)
return KafkaDeliveryError(message, ret)
def flush(self):
start = time.monotonic()
self.producer.flush(self.flush_timeout)
while self.deliveries_pending:
if time.monotonic() - start > self.flush_timeout:
break
self.producer.poll(0.1)
if self.deliveries_pending:
# Delivery timeout
raise self.delivery_error(
"flush() exceeded timeout (%ss)" % self.flush_timeout,
)
elif self.delivery_failures:
raise self.delivery_error("Failed deliveries after flush()")
def _sanitize_object(
self, object_type: str, object_: ModelObject
) -> Dict[str, str]:
dict_ = object_.to_dict()
if object_type == "content":
dict_.pop("data", None)
return dict_
def _write_addition(self, object_type: str, object_: ModelObject) -> None:
"""Write a single object to the journal"""
key = object_key(object_type, object_)
if self.anonymize:
anon_object_ = object_.anonymize()
if anon_object_: # can be either None, or an anonymized object
# if the object is anonymizable, send the non-anonymized version in the
# privileged channel
topic = f"{self._prefix_privileged}.{object_type}"
dict_ = self._sanitize_object(object_type, object_)
logger.debug("topic: %s, key: %s, value: %s", topic, key, dict_)
self.send(topic, key=key, value=dict_)
object_ = anon_object_
topic = f"{self._prefix}.{object_type}"
dict_ = self._sanitize_object(object_type, object_)
logger.debug("topic: %s, key: %s, value: %s", topic, key, dict_)
self.send(topic, key=key, value=dict_)
def write_addition(self, object_type: str, object_: ModelObject) -> None:
"""Write a single object to the journal"""
self._write_addition(object_type, object_)
self.flush()
write_update = write_addition
def write_additions(self, object_type: str, objects: Iterable[ModelObject]) -> None:
"""Write a set of objects to the journal"""
for object_ in objects:
self._write_addition(object_type, object_)
self.flush()
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Jul 4 2025, 9:28 AM (5 w, 5 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3252253
Attached To
rDJNL Journal infrastructure
Event Timeline
Log In to Comment