diff --git a/PKG-INFO b/PKG-INFO index 5737a75..56f204b 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,72 +1,72 @@ Metadata-Version: 2.1 Name: swh.journal -Version: 0.6.2 +Version: 0.7.0 Summary: Software Heritage Journal utilities Home-page: https://forge.softwareheritage.org/diffusion/DJNL/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh-journal Project-URL: Documentation, https://docs.softwareheritage.org/devel/swh-journal/ Description: swh-journal =========== Persistent logger of changes to the archive, with publish-subscribe support. See the [documentation](https://docs.softwareheritage.org/devel/swh-journal/index.html#software-heritage-journal) for more details. # Local test As a pre-requisite, you need a kakfa installation path. The following target will take care of this: ``` make install ``` Then, provided you are in the right virtual environment as described in the [swh getting-started](https://docs.softwareheritage.org/devel/developer-setup.html#developer-setup): ``` pytest ``` or: ``` tox ``` # Running ## publisher Command: ``` $ swh-journal --config-file ~/.config/swh/journal/publisher.yml \ publisher ``` # Auto-completion To have the completion, add the following in your ~/.virtualenvs/swh/bin/postactivate: ``` eval "$(_SWH_JOURNAL_COMPLETE=$autocomplete_cmd swh-journal)" ``` Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Requires-Python: >=3.7 Description-Content-Type: text/markdown Provides-Extra: testing diff --git a/requirements-swh.txt b/requirements-swh.txt index 3dea752..aa7bbf8 100644 --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1 +1 @@ -swh.model >= 0.10.0 +swh.model >= 0.12.0 diff --git a/swh.journal.egg-info/PKG-INFO b/swh.journal.egg-info/PKG-INFO index 5737a75..56f204b 100644 --- a/swh.journal.egg-info/PKG-INFO +++ b/swh.journal.egg-info/PKG-INFO @@ -1,72 +1,72 @@ Metadata-Version: 2.1 Name: swh.journal -Version: 0.6.2 +Version: 0.7.0 Summary: Software Heritage Journal utilities Home-page: https://forge.softwareheritage.org/diffusion/DJNL/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh-journal Project-URL: Documentation, https://docs.softwareheritage.org/devel/swh-journal/ Description: swh-journal =========== Persistent logger of changes to the archive, with publish-subscribe support. See the [documentation](https://docs.softwareheritage.org/devel/swh-journal/index.html#software-heritage-journal) for more details. # Local test As a pre-requisite, you need a kakfa installation path. The following target will take care of this: ``` make install ``` Then, provided you are in the right virtual environment as described in the [swh getting-started](https://docs.softwareheritage.org/devel/developer-setup.html#developer-setup): ``` pytest ``` or: ``` tox ``` # Running ## publisher Command: ``` $ swh-journal --config-file ~/.config/swh/journal/publisher.yml \ publisher ``` # Auto-completion To have the completion, add the following in your ~/.virtualenvs/swh/bin/postactivate: ``` eval "$(_SWH_JOURNAL_COMPLETE=$autocomplete_cmd swh-journal)" ``` Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Requires-Python: >=3.7 Description-Content-Type: text/markdown Provides-Extra: testing diff --git a/swh.journal.egg-info/SOURCES.txt b/swh.journal.egg-info/SOURCES.txt index 002dd05..1dd0955 100644 --- a/swh.journal.egg-info/SOURCES.txt +++ b/swh.journal.egg-info/SOURCES.txt @@ -1,48 +1,46 @@ .gitignore .pre-commit-config.yaml AUTHORS CODE_OF_CONDUCT.md LICENSE MANIFEST.in Makefile Makefile.local README.md mypy.ini pyproject.toml pytest.ini requirements-swh.txt requirements-test.txt requirements.txt setup.cfg setup.py tox.ini docs/.gitignore docs/Makefile docs/conf.py docs/index.rst docs/_static/.placeholder docs/_templates/.placeholder swh/__init__.py swh.journal.egg-info/PKG-INFO swh.journal.egg-info/SOURCES.txt swh.journal.egg-info/dependency_links.txt swh.journal.egg-info/entry_points.txt swh.journal.egg-info/requires.txt swh.journal.egg-info/top_level.txt swh/journal/__init__.py swh/journal/client.py swh/journal/py.typed swh/journal/pytest_plugin.py swh/journal/serializers.py swh/journal/tests/__init__.py -swh/journal/tests/conftest.py swh/journal/tests/journal_data.py swh/journal/tests/log4j.properties swh/journal/tests/test_client.py -swh/journal/tests/test_journal_data.py swh/journal/tests/test_kafka_writer.py swh/journal/tests/test_pytest_plugin.py swh/journal/tests/test_serializers.py swh/journal/writer/__init__.py swh/journal/writer/inmemory.py swh/journal/writer/kafka.py \ No newline at end of file diff --git a/swh.journal.egg-info/requires.txt b/swh.journal.egg-info/requires.txt index 171487b..8ab7ac4 100644 --- a/swh.journal.egg-info/requires.txt +++ b/swh.journal.egg-info/requires.txt @@ -1,8 +1,8 @@ confluent-kafka msgpack!=1.0.1,>=1.0.0 tenacity -swh.model>=0.10.0 +swh.model>=0.12.0 [testing] pytest hypothesis diff --git a/swh/journal/pytest_plugin.py b/swh/journal/pytest_plugin.py index f226f2a..01f7f74 100644 --- a/swh/journal/pytest_plugin.py +++ b/swh/journal/pytest_plugin.py @@ -1,258 +1,258 @@ -# Copyright (C) 2019-2020 The Software Heritage developers +# Copyright (C) 2019-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import defaultdict import random import string from typing import Any, Collection, Dict, Iterator, Optional import attr from confluent_kafka import Consumer, KafkaException, Producer from confluent_kafka.admin import AdminClient import pytest from swh.journal.serializers import kafka_to_key, kafka_to_value, pprint_key -from swh.journal.tests.journal_data import TEST_OBJECTS +from swh.model.tests.swh_model_data import TEST_OBJECTS def ensure_lists(value: Any) -> Any: """ >>> ensure_lists(["foo", 42]) ['foo', 42] >>> ensure_lists(("foo", 42)) ['foo', 42] >>> ensure_lists({"a": ["foo", 42]}) {'a': ['foo', 42]} >>> ensure_lists({"a": ("foo", 42)}) {'a': ['foo', 42]} """ if isinstance(value, (tuple, list)): return list(map(ensure_lists, value)) elif isinstance(value, dict): return dict(ensure_lists(list(value.items()))) else: return value def consume_messages(consumer, kafka_prefix, expected_messages): """Consume expected_messages from the consumer; Sort them all into a consumed_objects dict""" consumed_messages = defaultdict(list) fetched_messages = 0 retries_left = 1000 while fetched_messages < expected_messages: if retries_left == 0: raise ValueError( "Timed out fetching messages from kafka. " f"Only {fetched_messages}/{expected_messages} fetched" ) msg = consumer.poll(timeout=0.01) if not msg: retries_left -= 1 continue error = msg.error() if error is not None: if error.fatal(): raise KafkaException(error) retries_left -= 1 continue fetched_messages += 1 topic = msg.topic() assert topic.startswith(f"{kafka_prefix}.") or topic.startswith( f"{kafka_prefix}_privileged." ), "Unexpected topic" object_type = topic[len(kafka_prefix + ".") :] consumed_messages[object_type].append( (kafka_to_key(msg.key()), kafka_to_value(msg.value())) ) return consumed_messages def assert_all_objects_consumed( consumed_messages: Dict, exclude: Optional[Collection] = None ): """Check whether all objects from TEST_OBJECTS have been consumed `exclude` can be a list of object types for which we do not want to compare the values (eg. for anonymized object). """ for object_type, known_objects in TEST_OBJECTS.items(): known_keys = [obj.unique_key() for obj in known_objects] if not consumed_messages[object_type]: return (received_keys, received_values) = zip(*consumed_messages[object_type]) if object_type in ("content", "skipped_content"): for value in received_values: value.pop("ctime", None) if object_type == "content": known_objects = [attr.evolve(o, data=None) for o in known_objects] for key in known_keys: assert key in received_keys, ( f"expected {object_type} key {pprint_key(key)} " "absent from consumed messages" ) if exclude and object_type in exclude: continue for value in known_objects: expected_value = value.to_dict() if value.object_type in ("content", "skipped_content"): expected_value.pop("ctime", None) assert ensure_lists(expected_value) in received_values, ( f"expected {object_type} value {value!r} is " "absent from consumed messages" ) @pytest.fixture(scope="function") def kafka_prefix(): """Pick a random prefix for kafka topics on each call""" return "".join(random.choice(string.ascii_lowercase) for _ in range(10)) @pytest.fixture(scope="function") def kafka_consumer_group(kafka_prefix: str): """Pick a random consumer group for kafka consumers on each call""" return "test-consumer-%s" % kafka_prefix @pytest.fixture(scope="function") def object_types(): """Set of object types to precreate topics for.""" return set(TEST_OBJECTS.keys()) @pytest.fixture(scope="function") def privileged_object_types(): """Set of object types to precreate privileged topics for.""" return {"revision", "release"} @pytest.fixture(scope="function") def kafka_server( kafka_server_base: str, kafka_prefix: str, object_types: Iterator[str], privileged_object_types: Iterator[str], ) -> str: """A kafka server with existing topics Unprivileged topics are built as ``{kafka_prefix}.{object_type}`` with object_type from the ``object_types`` list. Privileged topics are built as ``{kafka_prefix}_privileged.{object_type}`` with object_type from the ``privileged_object_types`` list. """ topics = [f"{kafka_prefix}.{obj}" for obj in object_types] + [ f"{kafka_prefix}_privileged.{obj}" for obj in privileged_object_types ] # unfortunately, the Mock broker does not support the CreatTopic admin API, so we # have to create topics using a Producer. producer = Producer( { "bootstrap.servers": kafka_server_base, "client.id": "bootstrap producer", "acks": "all", } ) for topic in topics: producer.produce(topic=topic, value=None) for i in range(10): if producer.flush(0.1) == 0: break return kafka_server_base @pytest.fixture(scope="session") def kafka_server_base() -> Iterator[str]: """Create a mock kafka cluster suitable for tests. Yield a connection string. Note: this is a generator to keep the mock broker alive during the whole test session. see https://github.com/edenhill/librdkafka/blob/master/src/rdkafka_mock.h """ admin = AdminClient({"test.mock.num.brokers": "1"}) metadata = admin.list_topics() brokers = [str(broker) for broker in metadata.brokers.values()] assert len(brokers) == 1, "More than one broker found in the kafka cluster?!" broker_connstr, broker_id = brokers[0].split("/") yield broker_connstr TEST_CONFIG = { "consumer_id": "swh.journal.consumer", "stop_after_objects": 1, # will read 1 object and stop "storage": {"cls": "memory", "args": {}}, } @pytest.fixture def test_config( kafka_server_base: str, kafka_prefix: str, object_types: Iterator[str], privileged_object_types: Iterator[str], ): """Test configuration needed for producer/consumer """ return { **TEST_CONFIG, "object_types": object_types, "privileged_object_types": privileged_object_types, "brokers": [kafka_server_base], "prefix": kafka_prefix, } @pytest.fixture def consumer( kafka_server: str, test_config: Dict, kafka_consumer_group: str ) -> Consumer: """Get a connected Kafka consumer. """ consumer = Consumer( { "bootstrap.servers": kafka_server, "auto.offset.reset": "earliest", "enable.auto.commit": True, "group.id": kafka_consumer_group, } ) prefix = test_config["prefix"] kafka_topics = [ f"{prefix}.{object_type}" for object_type in test_config["object_types"] ] + [ f"{prefix}_privileged.{object_type}" for object_type in test_config["privileged_object_types"] ] consumer.subscribe(kafka_topics) yield consumer consumer.close() diff --git a/swh/journal/serializers.py b/swh/journal/serializers.py index d39d2db..274ef46 100644 --- a/swh/journal/serializers.py +++ b/swh/journal/serializers.py @@ -1,113 +1,113 @@ -# Copyright (C) 2016-2017 The Software Heritage developers +# Copyright (C) 2016-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime from enum import Enum from typing import Any, Union import msgpack from swh.model.model import KeyType class MsgpackExtTypeCodes(Enum): LONG_INT = 1 LONG_NEG_INT = 2 # this as been copied from swh.core.api.serializer # TODO refactor swh.core to make this function available def _msgpack_encode_longint(value): # needed because msgpack will not handle long integers with more than 64 bits # which we unfortunately happen to have to deal with from time to time if value > 0: code = MsgpackExtTypeCodes.LONG_INT.value else: code = MsgpackExtTypeCodes.LONG_NEG_INT.value value = -value length, rem = divmod(value.bit_length(), 8) if rem: length += 1 return msgpack.ExtType(code, int.to_bytes(value, length, "big")) def msgpack_ext_encode_types(obj): if isinstance(obj, int): return _msgpack_encode_longint(obj) return obj def msgpack_ext_hook(code, data): if code == MsgpackExtTypeCodes.LONG_INT.value: return int.from_bytes(data, "big") if code == MsgpackExtTypeCodes.LONG_NEG_INT.value: return -int.from_bytes(data, "big") raise ValueError("Unknown msgpack extended code %s" % code) # for BW compat def decode_types_bw(obj): if set(obj.keys()) == {b"d", b"swhtype"} and obj[b"swhtype"] == "datetime": return datetime.datetime.fromisoformat(obj[b"d"]) return obj def stringify_key_item(k: str, v: Union[str, bytes]) -> str: """Turn the item of a dict key into a string""" if isinstance(v, str): return v if k == "url": return v.decode("utf-8") return v.hex() def pprint_key(key: KeyType) -> str: """Pretty-print a kafka key""" if isinstance(key, dict): return "{%s}" % ", ".join( f"{k}: {stringify_key_item(k, v)}" for k, v in key.items() ) elif isinstance(key, bytes): return key.hex() else: return key def key_to_kafka(key: KeyType) -> bytes: """Serialize a key, possibly a dict, in a predictable way""" p = msgpack.Packer(use_bin_type=True) if isinstance(key, dict): return p.pack_map_pairs(sorted(key.items())) else: return p.pack(key) def kafka_to_key(kafka_key: bytes) -> KeyType: """Deserialize a key""" return msgpack.loads(kafka_key, raw=False) def value_to_kafka(value: Any) -> bytes: """Serialize some data for storage in kafka""" return msgpack.packb( value, use_bin_type=True, datetime=True, # encode datetime as msgpack.Timestamp default=msgpack_ext_encode_types, ) def kafka_to_value(kafka_value: bytes) -> Any: """Deserialize some data stored in kafka""" return msgpack.unpackb( kafka_value, raw=False, object_hook=decode_types_bw, ext_hook=msgpack_ext_hook, strict_map_key=False, timestamp=3, # convert Timestamp in datetime objects (tz UTC) ) diff --git a/swh/journal/tests/conftest.py b/swh/journal/tests/conftest.py deleted file mode 100644 index 39670ed..0000000 --- a/swh/journal/tests/conftest.py +++ /dev/null @@ -1,27 +0,0 @@ -# Copyright (C) 2019-2020 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -import logging - -from hypothesis.strategies import one_of - -# for bw compat -from swh.journal.tests.journal_data import * # noqa -from swh.model import hypothesis_strategies as strategies - -logger = logging.getLogger(__name__) - - -def objects_d(): - return one_of( - strategies.origins().map(lambda x: ("origin", x.to_dict())), - strategies.origin_visits().map(lambda x: ("origin_visit", x.to_dict())), - strategies.snapshots().map(lambda x: ("snapshot", x.to_dict())), - strategies.releases().map(lambda x: ("release", x.to_dict())), - strategies.revisions().map(lambda x: ("revision", x.to_dict())), - strategies.directories().map(lambda x: ("directory", x.to_dict())), - strategies.skipped_contents().map(lambda x: ("skipped_content", x.to_dict())), - strategies.present_contents().map(lambda x: ("content", x.to_dict())), - ) diff --git a/swh/journal/tests/journal_data.py b/swh/journal/tests/journal_data.py index 90fee08..2be9a76 100644 --- a/swh/journal/tests/journal_data.py +++ b/swh/journal/tests/journal_data.py @@ -1,348 +1,15 @@ -# Copyright (C) 2019-2020 The Software Heritage developers +# Copyright (C) 2019-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import datetime -from typing import Dict, Sequence +# deprecated, for BW compat only -import attr +import warnings -from swh.model.hashutil import MultiHash, hash_to_bytes, hash_to_hex -from swh.model.identifiers import SWHID -from swh.model.model import ( - BaseModel, - Content, - Directory, - DirectoryEntry, - MetadataAuthority, - MetadataAuthorityType, - MetadataFetcher, - MetadataTargetType, - ObjectType, - Origin, - OriginVisit, - OriginVisitStatus, - Person, - RawExtrinsicMetadata, - Release, - Revision, - RevisionType, - SkippedContent, - Snapshot, - SnapshotBranch, - TargetType, - Timestamp, - TimestampWithTimezone, -) - -UTC = datetime.timezone.utc - -CONTENTS = [ - Content( - length=4, - data=f"foo{i}".encode(), - status="visible", - **MultiHash.from_data(f"foo{i}".encode()).digest(), - ) - for i in range(10) -] + [ - Content( - length=14, - data=f"forbidden foo{i}".encode(), - status="hidden", - **MultiHash.from_data(f"forbidden foo{i}".encode()).digest(), - ) - for i in range(10) -] - -SKIPPED_CONTENTS = [ - SkippedContent( - length=4, - status="absent", - reason=f"because chr({i}) != '*'", - **MultiHash.from_data(f"bar{i}".encode()).digest(), - ) - for i in range(2) -] +from swh.model.tests.swh_model_data import DUPLICATE_CONTENTS, TEST_OBJECTS # noqa -duplicate_content1 = Content( - length=4, - sha1=hash_to_bytes("44973274ccef6ab4dfaaf86599792fa9c3fe4689"), - sha1_git=b"another-foo", - blake2s256=b"another-bar", - sha256=b"another-baz", - status="visible", +warnings.warn( + "This module is deprecated, please use swh.model.tests.swh_model_data instead", + category=DeprecationWarning, ) - -# Craft a sha1 collision -sha1_array = bytearray(duplicate_content1.sha1_git) -sha1_array[0] += 1 -duplicate_content2 = attr.evolve(duplicate_content1, sha1_git=bytes(sha1_array)) - - -DUPLICATE_CONTENTS = [duplicate_content1, duplicate_content2] - - -COMMITTERS = [ - Person(fullname=b"foo", name=b"foo", email=b""), - Person(fullname=b"bar", name=b"bar", email=b""), -] - -DATES = [ - TimestampWithTimezone( - timestamp=Timestamp(seconds=1234567891, microseconds=0,), - offset=120, - negative_utc=False, - ), - TimestampWithTimezone( - timestamp=Timestamp(seconds=1234567892, microseconds=0,), - offset=120, - negative_utc=False, - ), -] - -REVISIONS = [ - Revision( - id=hash_to_bytes("4ca486e65eb68e4986aeef8227d2db1d56ce51b3"), - message=b"hello", - date=DATES[0], - committer=COMMITTERS[0], - author=COMMITTERS[0], - committer_date=DATES[0], - type=RevisionType.GIT, - directory=b"\x01" * 20, - synthetic=False, - metadata=None, - parents=(), - ), - Revision( - id=hash_to_bytes("677063f5c405d6fc1781fc56379c9a9adf43d3a0"), - message=b"hello again", - date=DATES[1], - committer=COMMITTERS[1], - author=COMMITTERS[1], - committer_date=DATES[1], - type=RevisionType.MERCURIAL, - directory=b"\x02" * 20, - synthetic=False, - metadata=None, - parents=(), - extra_headers=((b"foo", b"bar"),), - ), -] - -RELEASES = [ - Release( - id=hash_to_bytes("8059dc4e17fcd0e51ca3bcd6b80f4577d281fd08"), - name=b"v0.0.1", - date=TimestampWithTimezone( - timestamp=Timestamp(seconds=1234567890, microseconds=0,), - offset=120, - negative_utc=False, - ), - author=COMMITTERS[0], - target_type=ObjectType.REVISION, - target=b"\x04" * 20, - message=b"foo", - synthetic=False, - ), -] - -ORIGINS = [ - Origin(url="https://somewhere.org/den/fox",), - Origin(url="https://overtherainbow.org/fox/den",), -] - -ORIGIN_VISITS = [ - OriginVisit( - origin=ORIGINS[0].url, - date=datetime.datetime(2013, 5, 7, 4, 20, 39, 369271, tzinfo=UTC), - visit=1, - type="git", - ), - OriginVisit( - origin=ORIGINS[1].url, - date=datetime.datetime(2014, 11, 27, 17, 20, 39, tzinfo=UTC), - visit=1, - type="hg", - ), - OriginVisit( - origin=ORIGINS[0].url, - date=datetime.datetime(2018, 11, 27, 17, 20, 39, tzinfo=UTC), - visit=2, - type="git", - ), - OriginVisit( - origin=ORIGINS[0].url, - date=datetime.datetime(2018, 11, 27, 17, 20, 39, tzinfo=UTC), - visit=3, - type="git", - ), - OriginVisit( - origin=ORIGINS[1].url, - date=datetime.datetime(2015, 11, 27, 17, 20, 39, tzinfo=UTC), - visit=2, - type="hg", - ), -] - -# The origin-visit-status dates needs to be shifted slightly in the future from their -# visit dates counterpart. Otherwise, we are hitting storage-wise the "on conflict" -# ignore policy (because origin-visit-add creates an origin-visit-status with the same -# parameters from the origin-visit {origin, visit, date}... -ORIGIN_VISIT_STATUSES = [ - OriginVisitStatus( - origin=ORIGINS[0].url, - date=datetime.datetime(2013, 5, 7, 4, 20, 39, 432222, tzinfo=UTC), - visit=1, - type="git", - status="ongoing", - snapshot=None, - metadata=None, - ), - OriginVisitStatus( - origin=ORIGINS[1].url, - date=datetime.datetime(2014, 11, 27, 17, 21, 12, tzinfo=UTC), - visit=1, - type="hg", - status="ongoing", - snapshot=None, - metadata=None, - ), - OriginVisitStatus( - origin=ORIGINS[0].url, - date=datetime.datetime(2018, 11, 27, 17, 20, 59, tzinfo=UTC), - visit=2, - type="git", - status="ongoing", - snapshot=None, - metadata=None, - ), - OriginVisitStatus( - origin=ORIGINS[0].url, - date=datetime.datetime(2018, 11, 27, 17, 20, 49, tzinfo=UTC), - visit=3, - type="git", - status="full", - snapshot=hash_to_bytes("17d0066a4a80aba4a0e913532ee8ff2014f006a9"), - metadata=None, - ), - OriginVisitStatus( - origin=ORIGINS[1].url, - date=datetime.datetime(2015, 11, 27, 17, 22, 18, tzinfo=UTC), - visit=2, - type="hg", - status="partial", - snapshot=hash_to_bytes("8ce268b87faf03850693673c3eb5c9bb66e1ca38"), - metadata=None, - ), -] - - -DIRECTORIES = [ - Directory(id=hash_to_bytes("4b825dc642cb6eb9a060e54bf8d69288fbee4904"), entries=()), - Directory( - id=hash_to_bytes("21416d920e0ebf0df4a7888bed432873ed5cb3a7"), - entries=( - DirectoryEntry( - name=b"file1.ext", - perms=0o644, - type="file", - target=CONTENTS[0].sha1_git, - ), - DirectoryEntry( - name=b"dir1", - perms=0o755, - type="dir", - target=hash_to_bytes("4b825dc642cb6eb9a060e54bf8d69288fbee4904"), - ), - DirectoryEntry( - name=b"subprepo1", perms=0o160000, type="rev", target=REVISIONS[1].id, - ), - ), - ), -] - - -SNAPSHOTS = [ - Snapshot( - id=hash_to_bytes("17d0066a4a80aba4a0e913532ee8ff2014f006a9"), - branches={ - b"master": SnapshotBranch( - target_type=TargetType.REVISION, target=REVISIONS[0].id - ) - }, - ), - Snapshot( - id=hash_to_bytes("8ce268b87faf03850693673c3eb5c9bb66e1ca38"), - branches={ - b"target/revision": SnapshotBranch( - target_type=TargetType.REVISION, target=REVISIONS[0].id, - ), - b"target/alias": SnapshotBranch( - target_type=TargetType.ALIAS, target=b"target/revision" - ), - b"target/directory": SnapshotBranch( - target_type=TargetType.DIRECTORY, target=DIRECTORIES[0].id, - ), - b"target/release": SnapshotBranch( - target_type=TargetType.RELEASE, target=RELEASES[0].id - ), - b"target/snapshot": SnapshotBranch( - target_type=TargetType.SNAPSHOT, - target=hash_to_bytes("17d0066a4a80aba4a0e913532ee8ff2014f006a9"), - ), - }, - ), -] - - -METADATA_AUTHORITIES = [ - MetadataAuthority( - type=MetadataAuthorityType.FORGE, url="http://example.org/", metadata={}, - ), -] - -METADATA_FETCHERS = [ - MetadataFetcher(name="test-fetcher", version="1.0.0", metadata={},) -] - -RAW_EXTRINSIC_METADATA = [ - RawExtrinsicMetadata( - type=MetadataTargetType.ORIGIN, - target="http://example.org/foo.git", - discovery_date=datetime.datetime(2020, 7, 30, 17, 8, 20, tzinfo=UTC), - authority=attr.evolve(METADATA_AUTHORITIES[0], metadata=None), - fetcher=attr.evolve(METADATA_FETCHERS[0], metadata=None), - format="json", - metadata=b'{"foo": "bar"}', - ), - RawExtrinsicMetadata( - type=MetadataTargetType.CONTENT, - target=SWHID( - object_type="content", object_id=hash_to_hex(CONTENTS[0].sha1_git) - ), - discovery_date=datetime.datetime(2020, 7, 30, 17, 8, 20, tzinfo=UTC), - authority=attr.evolve(METADATA_AUTHORITIES[0], metadata=None), - fetcher=attr.evolve(METADATA_FETCHERS[0], metadata=None), - format="json", - metadata=b'{"foo": "bar"}', - ), -] - - -TEST_OBJECTS: Dict[str, Sequence[BaseModel]] = { - "content": CONTENTS, - "directory": DIRECTORIES, - "metadata_authority": METADATA_AUTHORITIES, - "metadata_fetcher": METADATA_FETCHERS, - "origin": ORIGINS, - "origin_visit": ORIGIN_VISITS, - "origin_visit_status": ORIGIN_VISIT_STATUSES, - "raw_extrinsic_metadata": RAW_EXTRINSIC_METADATA, - "release": RELEASES, - "revision": REVISIONS, - "snapshot": SNAPSHOTS, - "skipped_content": SKIPPED_CONTENTS, -} diff --git a/swh/journal/tests/test_journal_data.py b/swh/journal/tests/test_journal_data.py deleted file mode 100644 index a90e722..0000000 --- a/swh/journal/tests/test_journal_data.py +++ /dev/null @@ -1,23 +0,0 @@ -# Copyright (C) 2020 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -from swh.journal.tests.journal_data import TEST_OBJECTS - - -def test_ensure_visit_visit_status_date_consistency(): - """ensure origin-visit-status dates are more recent than their visit counterpart - - The origin-visit-status dates needs to be shifted slightly in the future from their - visit dates counterpart. Otherwise, we are hitting storage-wise the "on conflict" - ignore policy (because origin-visit-add creates an origin-visit-status with the same - parameters from the origin-visit {origin, visit, date}... - - """ - visits = TEST_OBJECTS["origin_visit"] - visit_statuses = TEST_OBJECTS["origin_visit_status"] - for visit, visit_status in zip(visits, visit_statuses): - assert visit.origin == visit_status.origin - assert visit.visit == visit_status.visit - assert visit.date < visit_status.date diff --git a/swh/journal/tests/test_kafka_writer.py b/swh/journal/tests/test_kafka_writer.py index 75735f3..db81330 100644 --- a/swh/journal/tests/test_kafka_writer.py +++ b/swh/journal/tests/test_kafka_writer.py @@ -1,172 +1,172 @@ -# Copyright (C) 2018-2020 The Software Heritage developers +# Copyright (C) 2018-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Iterable from confluent_kafka import Consumer, Producer import pytest from swh.journal.pytest_plugin import assert_all_objects_consumed, consume_messages -from swh.journal.tests.journal_data import TEST_OBJECTS from swh.journal.writer import model_object_dict_sanitizer from swh.journal.writer.kafka import KafkaDeliveryError, KafkaJournalWriter from swh.model.model import BaseModel, Directory, Release, Revision +from swh.model.tests.swh_model_data import TEST_OBJECTS def test_kafka_writer( kafka_prefix: str, kafka_server: str, consumer: Consumer, privileged_object_types: Iterable[str], ): writer = KafkaJournalWriter[BaseModel]( brokers=[kafka_server], client_id="kafka_writer", prefix=kafka_prefix, value_sanitizer=model_object_dict_sanitizer, anonymize=False, ) expected_messages = 0 for object_type, objects in TEST_OBJECTS.items(): writer.write_additions(object_type, objects) expected_messages += len(objects) consumed_messages = consume_messages(consumer, kafka_prefix, expected_messages) assert_all_objects_consumed(consumed_messages) for key, obj_dict in consumed_messages["revision"]: obj = Revision.from_dict(obj_dict) for person in (obj.author, obj.committer): assert not ( len(person.fullname) == 32 and person.name is None and person.email is None ) for key, obj_dict in consumed_messages["release"]: obj = Release.from_dict(obj_dict) for person in (obj.author,): assert not ( len(person.fullname) == 32 and person.name is None and person.email is None ) def test_kafka_writer_anonymized( kafka_prefix: str, kafka_server: str, consumer: Consumer, privileged_object_types: Iterable[str], ): writer = KafkaJournalWriter[BaseModel]( brokers=[kafka_server], client_id="kafka_writer", prefix=kafka_prefix, value_sanitizer=model_object_dict_sanitizer, anonymize=True, ) expected_messages = 0 for object_type, objects in TEST_OBJECTS.items(): writer.write_additions(object_type, objects) expected_messages += len(objects) if object_type in privileged_object_types: expected_messages += len(objects) consumed_messages = consume_messages(consumer, kafka_prefix, expected_messages) assert_all_objects_consumed(consumed_messages, exclude=["revision", "release"]) for key, obj_dict in consumed_messages["revision"]: obj = Revision.from_dict(obj_dict) for person in (obj.author, obj.committer): assert ( len(person.fullname) == 32 and person.name is None and person.email is None ) for key, obj_dict in consumed_messages["release"]: obj = Release.from_dict(obj_dict) for person in (obj.author,): assert ( len(person.fullname) == 32 and person.name is None and person.email is None ) def test_write_delivery_failure( kafka_prefix: str, kafka_server: str, consumer: Consumer ): class MockKafkaError: """A mocked kafka error""" def str(self): return "Mocked Kafka Error" def name(self): return "SWH_MOCK_ERROR" class KafkaJournalWriterFailDelivery(KafkaJournalWriter): """A journal writer which always fails delivering messages""" def _on_delivery(self, error, message): """Replace the inbound error with a fake delivery error""" super()._on_delivery(MockKafkaError(), message) kafka_prefix += ".swh.journal.objects" writer = KafkaJournalWriterFailDelivery( brokers=[kafka_server], client_id="kafka_writer", prefix=kafka_prefix, value_sanitizer=model_object_dict_sanitizer, ) empty_dir = Directory(entries=()) with pytest.raises(KafkaDeliveryError) as exc: writer.write_addition("directory", empty_dir) assert "Failed deliveries" in exc.value.message assert len(exc.value.delivery_failures) == 1 delivery_failure = exc.value.delivery_failures[0] assert delivery_failure.key == empty_dir.id assert delivery_failure.code == "SWH_MOCK_ERROR" def test_write_delivery_timeout( kafka_prefix: str, kafka_server: str, consumer: Consumer ): produced = [] class MockProducer(Producer): """A kafka producer which pretends to produce messages, but never sends any delivery acknowledgements""" def produce(self, **kwargs): produced.append(kwargs) writer = KafkaJournalWriter[BaseModel]( brokers=[kafka_server], client_id="kafka_writer", prefix=kafka_prefix, value_sanitizer=model_object_dict_sanitizer, flush_timeout=1, producer_class=MockProducer, ) empty_dir = Directory(entries=()) with pytest.raises(KafkaDeliveryError) as exc: writer.write_addition("directory", empty_dir) assert len(produced) == 1 assert "timeout" in exc.value.message assert len(exc.value.delivery_failures) == 1 delivery_failure = exc.value.delivery_failures[0] assert delivery_failure.key == empty_dir.id assert delivery_failure.code == "SWH_FLUSH_TIMEOUT" diff --git a/swh/journal/tests/test_serializers.py b/swh/journal/tests/test_serializers.py index 9420b10..b94825f 100644 --- a/swh/journal/tests/test_serializers.py +++ b/swh/journal/tests/test_serializers.py @@ -1,116 +1,115 @@ # Copyright (C) 2017-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import OrderedDict from datetime import datetime, timedelta, timezone import itertools from typing import Iterable import pytest from swh.journal import serializers - -from .conftest import TEST_OBJECTS +from swh.model.tests.swh_model_data import TEST_OBJECTS def test_key_to_kafka_repeatable(): """Check the kafka key encoding is repeatable""" base_dict = { "a": "foo", "b": "bar", "c": "baz", } key = serializers.key_to_kafka(base_dict) for dict_keys in itertools.permutations(base_dict): d = OrderedDict() for k in dict_keys: d[k] = base_dict[k] assert key == serializers.key_to_kafka(d) def test_pprint_key(): """Test whether get_key works on all our objects""" for object_type, objects in TEST_OBJECTS.items(): for obj in objects: key = obj.unique_key() pprinted_key = serializers.pprint_key(key) assert isinstance(pprinted_key, str) if isinstance(key, dict): assert pprinted_key[0], pprinted_key[-1] == "{}" for dict_key in key.keys(): assert f"{dict_key}:" in pprinted_key if isinstance(key, bytes): assert pprinted_key == key.hex() def test_kafka_to_key(): """Standard back and forth serialization with keys """ # All KeyType(s) keys: Iterable[serializers.KeyType] = [ {"a": "foo", "b": "bar", "c": "baz",}, {"a": b"foobarbaz",}, b"foo", ] for object_type, objects in TEST_OBJECTS.items(): for obj in objects: key = obj.unique_key() keys.append(key) for key in keys: ktk = serializers.key_to_kafka(key) v = serializers.kafka_to_key(ktk) assert v == key # limits of supported int values by msgpack MININT = -(2 ** 63) MAXINT = 2 ** 64 - 1 intvalues = [ MININT * 2, MININT - 1, MININT, MININT + 1, -10, 0, 10, MAXINT - 1, MAXINT, MAXINT + 1, MAXINT * 2, ] @pytest.mark.parametrize("value", intvalues) def test_encode_int(value): assert serializers.kafka_to_value(serializers.value_to_kafka(value)) == value datevalues = [ datetime.now(tz=timezone.utc), datetime.now(tz=timezone(timedelta(hours=-23, minutes=-59))), datetime.now(tz=timezone(timedelta(hours=23, minutes=59))), datetime(1, 1, 1, 1, 1, tzinfo=timezone.utc), datetime(2100, 1, 1, 1, 1, tzinfo=timezone.utc), ] @pytest.mark.parametrize("value", datevalues) def test_encode_datetime(value): assert serializers.kafka_to_value(serializers.value_to_kafka(value)) == value @pytest.mark.parametrize("value", datevalues) def test_encode_datetime_bw(value): bwdate = {b"swhtype": "datetime", b"d": value.isoformat()} assert serializers.kafka_to_value(serializers.value_to_kafka(bwdate)) == value