diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 573f9ef..24811ab 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,58 +1,49 @@ repos: - repo: https://github.com/pre-commit/pre-commit-hooks rev: v2.4.0 hooks: - id: trailing-whitespace - id: check-json - id: check-yaml -# we need the master of pyflakes to have support for @overload on methods, -# so we use a local config for now -# - repo: https://gitlab.com/pycqa/flake8 -# hooks: -# - id: flake8 -- repo: local +- repo: https://gitlab.com/pycqa/flake8 + rev: 3.8.4 hooks: - id: flake8 - name: flake8 - entry: flake8 - pass_filenames: true - language: system - types: [python] - repo: https://github.com/codespell-project/codespell rev: v1.16.0 hooks: - id: codespell - repo: local hooks: - id: mypy name: mypy entry: mypy args: [swh] pass_filenames: false language: system types: [python] - repo: https://github.com/PyCQA/isort rev: 5.5.2 hooks: - id: isort - repo: https://github.com/python/black rev: 19.10b0 hooks: - id: black # unfortunately, we are far from being able to enable this... # - repo: https://github.com/PyCQA/pydocstyle.git # rev: 4.0.0 # hooks: # - id: pydocstyle # name: pydocstyle # description: pydocstyle is a static analysis tool for checking compliance with Python docstring conventions. # entry: pydocstyle --convention=google # language: python # types: [python] diff --git a/PKG-INFO b/PKG-INFO index 2a84d35..eca5f37 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,72 +1,72 @@ Metadata-Version: 2.1 Name: swh.journal -Version: 0.4.3 +Version: 0.5.0 Summary: Software Heritage Journal utilities Home-page: https://forge.softwareheritage.org/diffusion/DJNL/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh-journal Project-URL: Documentation, https://docs.softwareheritage.org/devel/swh-journal/ Description: swh-journal =========== Persistent logger of changes to the archive, with publish-subscribe support. See the [documentation](https://docs.softwareheritage.org/devel/swh-journal/index.html#software-heritage-journal) for more details. # Local test As a pre-requisite, you need a kakfa installation path. The following target will take care of this: ``` make install ``` Then, provided you are in the right virtual environment as described in the [swh getting-started](https://docs.softwareheritage.org/devel/developer-setup.html#developer-setup): ``` pytest ``` or: ``` tox ``` # Running ## publisher Command: ``` $ swh-journal --config-file ~/.config/swh/journal/publisher.yml \ publisher ``` # Auto-completion To have the completion, add the following in your ~/.virtualenvs/swh/bin/postactivate: ``` eval "$(_SWH_JOURNAL_COMPLETE=$autocomplete_cmd swh-journal)" ``` Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Requires-Python: >=3.7 Description-Content-Type: text/markdown Provides-Extra: testing diff --git a/requirements-swh.txt b/requirements-swh.txt index 1108901..0851e17 100644 --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,2 +1,2 @@ swh.core[db,http] >= 0.0.60 -swh.model >= 0.6.6 +swh.model >= 0.7.2 diff --git a/swh.journal.egg-info/PKG-INFO b/swh.journal.egg-info/PKG-INFO index 2a84d35..eca5f37 100644 --- a/swh.journal.egg-info/PKG-INFO +++ b/swh.journal.egg-info/PKG-INFO @@ -1,72 +1,72 @@ Metadata-Version: 2.1 Name: swh.journal -Version: 0.4.3 +Version: 0.5.0 Summary: Software Heritage Journal utilities Home-page: https://forge.softwareheritage.org/diffusion/DJNL/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh-journal Project-URL: Documentation, https://docs.softwareheritage.org/devel/swh-journal/ Description: swh-journal =========== Persistent logger of changes to the archive, with publish-subscribe support. See the [documentation](https://docs.softwareheritage.org/devel/swh-journal/index.html#software-heritage-journal) for more details. # Local test As a pre-requisite, you need a kakfa installation path. The following target will take care of this: ``` make install ``` Then, provided you are in the right virtual environment as described in the [swh getting-started](https://docs.softwareheritage.org/devel/developer-setup.html#developer-setup): ``` pytest ``` or: ``` tox ``` # Running ## publisher Command: ``` $ swh-journal --config-file ~/.config/swh/journal/publisher.yml \ publisher ``` # Auto-completion To have the completion, add the following in your ~/.virtualenvs/swh/bin/postactivate: ``` eval "$(_SWH_JOURNAL_COMPLETE=$autocomplete_cmd swh-journal)" ``` Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Requires-Python: >=3.7 Description-Content-Type: text/markdown Provides-Extra: testing diff --git a/swh.journal.egg-info/SOURCES.txt b/swh.journal.egg-info/SOURCES.txt index 0445a6b..002dd05 100644 --- a/swh.journal.egg-info/SOURCES.txt +++ b/swh.journal.egg-info/SOURCES.txt @@ -1,49 +1,48 @@ .gitignore .pre-commit-config.yaml AUTHORS CODE_OF_CONDUCT.md LICENSE MANIFEST.in Makefile Makefile.local README.md mypy.ini pyproject.toml pytest.ini requirements-swh.txt requirements-test.txt requirements.txt setup.cfg setup.py tox.ini docs/.gitignore docs/Makefile docs/conf.py docs/index.rst docs/_static/.placeholder docs/_templates/.placeholder swh/__init__.py swh.journal.egg-info/PKG-INFO swh.journal.egg-info/SOURCES.txt swh.journal.egg-info/dependency_links.txt swh.journal.egg-info/entry_points.txt swh.journal.egg-info/requires.txt swh.journal.egg-info/top_level.txt swh/journal/__init__.py -swh/journal/cli.py swh/journal/client.py swh/journal/py.typed swh/journal/pytest_plugin.py swh/journal/serializers.py swh/journal/tests/__init__.py swh/journal/tests/conftest.py swh/journal/tests/journal_data.py swh/journal/tests/log4j.properties swh/journal/tests/test_client.py swh/journal/tests/test_journal_data.py swh/journal/tests/test_kafka_writer.py swh/journal/tests/test_pytest_plugin.py swh/journal/tests/test_serializers.py swh/journal/writer/__init__.py swh/journal/writer/inmemory.py swh/journal/writer/kafka.py \ No newline at end of file diff --git a/swh.journal.egg-info/requires.txt b/swh.journal.egg-info/requires.txt index 6967495..1906592 100644 --- a/swh.journal.egg-info/requires.txt +++ b/swh.journal.egg-info/requires.txt @@ -1,9 +1,9 @@ confluent-kafka msgpack tenacity swh.core[db,http]>=0.0.60 -swh.model>=0.6.6 +swh.model>=0.7.2 [testing] pytest hypothesis diff --git a/swh/journal/cli.py b/swh/journal/cli.py deleted file mode 100644 index dcbc8a8..0000000 --- a/swh/journal/cli.py +++ /dev/null @@ -1,93 +0,0 @@ -# Copyright (C) 2016-2019 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -import logging - -import click - -from swh.core.cli import CONTEXT_SETTINGS - - -@click.group(name="journal", context_settings=CONTEXT_SETTINGS) -@click.option( - "--config-file", - "-C", - default=None, - type=click.Path(exists=True, dir_okay=False,), - help="Configuration file.", -) -@click.pass_context -def cli(ctx, config_file): - """DEPRECATED Software Heritage Journal tools. - """ - pass - - -@cli.command() -@click.option( - "--stop-after-objects", - "-n", - default=None, - type=int, - help="Stop after processing this many objects. Default is to " "run forever.", -) -@click.pass_context -def replay(ctx, stop_after_objects): - """DEPRECATED: use `swh storage replay` instead. - - Requires swh.storage >= 0.0.188. - """ - ctx.fail("DEPRECATED") - - -@cli.command() -@click.argument("object_type") -@click.option("--start-object", default=None) -@click.option("--end-object", default=None) -@click.option("--dry-run", is_flag=True, default=False) -@click.pass_context -def backfiller(ctx, object_type, start_object, end_object, dry_run): - """DEPRECATED: use `swh storage backfill` instead. - - Requires swh.storage >= 0.0.188. - - """ - ctx.fail("DEPRECATED") - - -@cli.command("content-replay") -@click.option( - "--stop-after-objects", - "-n", - default=None, - type=int, - help="Stop after processing this many objects. Default is to " "run forever.", -) -@click.option( - "--exclude-sha1-file", - default=None, - type=click.File("rb"), - help="File containing a sorted array of hashes to be excluded.", -) -@click.option( - "--check-dst/--no-check-dst", - default=True, - help="Check whether the destination contains the object before " "copying.", -) -@click.pass_context -def content_replay(ctx, stop_after_objects, exclude_sha1_file, check_dst): - """DEPRECATED: use `swh objstorage replay` instead. - - This needs the swh.objstorage.replayer package.""" - ctx.fail("DEPRECATED") - - -def main(): - logging.basicConfig() - return cli(auto_envvar_prefix="SWH_JOURNAL") - - -if __name__ == "__main__": - main() diff --git a/swh/journal/pytest_plugin.py b/swh/journal/pytest_plugin.py index c1025e6..9f8d458 100644 --- a/swh/journal/pytest_plugin.py +++ b/swh/journal/pytest_plugin.py @@ -1,239 +1,239 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import defaultdict import random import string from typing import Collection, Dict, Iterator, Optional import attr from confluent_kafka import Consumer, KafkaException, Producer from confluent_kafka.admin import AdminClient import pytest -from swh.journal.serializers import kafka_to_key, kafka_to_value, object_key, pprint_key +from swh.journal.serializers import kafka_to_key, kafka_to_value, pprint_key from swh.journal.tests.journal_data import TEST_OBJECTS def consume_messages(consumer, kafka_prefix, expected_messages): """Consume expected_messages from the consumer; Sort them all into a consumed_objects dict""" consumed_messages = defaultdict(list) fetched_messages = 0 retries_left = 1000 while fetched_messages < expected_messages: if retries_left == 0: raise ValueError( "Timed out fetching messages from kafka. " f"Only {fetched_messages}/{expected_messages} fetched" ) msg = consumer.poll(timeout=0.01) if not msg: retries_left -= 1 continue error = msg.error() if error is not None: if error.fatal(): raise KafkaException(error) retries_left -= 1 continue fetched_messages += 1 topic = msg.topic() assert topic.startswith(f"{kafka_prefix}.") or topic.startswith( f"{kafka_prefix}_privileged." ), "Unexpected topic" object_type = topic[len(kafka_prefix + ".") :] consumed_messages[object_type].append( (kafka_to_key(msg.key()), kafka_to_value(msg.value())) ) return consumed_messages def assert_all_objects_consumed( consumed_messages: Dict, exclude: Optional[Collection] = None ): """Check whether all objects from TEST_OBJECTS have been consumed `exclude` can be a list of object types for which we do not want to compare the values (eg. for anonymized object). """ for object_type, known_objects in TEST_OBJECTS.items(): - known_keys = [object_key(object_type, obj) for obj in known_objects] + known_keys = [obj.unique_key() for obj in known_objects] if not consumed_messages[object_type]: return (received_keys, received_values) = zip(*consumed_messages[object_type]) if object_type in ("content", "skipped_content"): for value in received_values: value.pop("ctime", None) if object_type == "content": known_objects = [attr.evolve(o, data=None) for o in known_objects] for key in known_keys: assert key in received_keys, ( f"expected {object_type} key {pprint_key(key)} " "absent from consumed messages" ) if exclude and object_type in exclude: continue for value in known_objects: expected_value = value.to_dict() if value.object_type in ("content", "skipped_content"): expected_value.pop("ctime", None) assert expected_value in received_values, ( f"expected {object_type} value {value!r} is " "absent from consumed messages" ) @pytest.fixture(scope="function") def kafka_prefix(): """Pick a random prefix for kafka topics on each call""" return "".join(random.choice(string.ascii_lowercase) for _ in range(10)) @pytest.fixture(scope="function") def kafka_consumer_group(kafka_prefix: str): """Pick a random consumer group for kafka consumers on each call""" return "test-consumer-%s" % kafka_prefix @pytest.fixture(scope="function") def object_types(): """Set of object types to precreate topics for.""" return set(TEST_OBJECTS.keys()) @pytest.fixture(scope="function") def privileged_object_types(): """Set of object types to precreate privileged topics for.""" return {"revision", "release"} @pytest.fixture(scope="function") def kafka_server( kafka_server_base: str, kafka_prefix: str, object_types: Iterator[str], privileged_object_types: Iterator[str], ) -> str: """A kafka server with existing topics Unprivileged topics are built as ``{kafka_prefix}.{object_type}`` with object_type from the ``object_types`` list. Privileged topics are built as ``{kafka_prefix}_privileged.{object_type}`` with object_type from the ``privileged_object_types`` list. """ topics = [f"{kafka_prefix}.{obj}" for obj in object_types] + [ f"{kafka_prefix}_privileged.{obj}" for obj in privileged_object_types ] # unfortunately, the Mock broker does not support the CreatTopic admin API, so we # have to create topics using a Producer. producer = Producer( { "bootstrap.servers": kafka_server_base, "client.id": "bootstrap producer", "acks": "all", } ) for topic in topics: producer.produce(topic=topic, value=None) for i in range(10): if producer.flush(0.1) == 0: break return kafka_server_base @pytest.fixture(scope="session") def kafka_server_base() -> Iterator[str]: """Create a mock kafka cluster suitable for tests. Yield a connection string. Note: this is a generator to keep the mock broker alive during the whole test session. see https://github.com/edenhill/librdkafka/blob/master/src/rdkafka_mock.h """ admin = AdminClient({"test.mock.num.brokers": "1"}) metadata = admin.list_topics() brokers = [str(broker) for broker in metadata.brokers.values()] assert len(brokers) == 1, "More than one broker found in the kafka cluster?!" broker_connstr, broker_id = brokers[0].split("/") yield broker_connstr TEST_CONFIG = { "consumer_id": "swh.journal.consumer", "stop_after_objects": 1, # will read 1 object and stop "storage": {"cls": "memory", "args": {}}, } @pytest.fixture def test_config( kafka_server_base: str, kafka_prefix: str, object_types: Iterator[str], privileged_object_types: Iterator[str], ): """Test configuration needed for producer/consumer """ return { **TEST_CONFIG, "object_types": object_types, "privileged_object_types": privileged_object_types, "brokers": [kafka_server_base], "prefix": kafka_prefix, } @pytest.fixture def consumer( kafka_server: str, test_config: Dict, kafka_consumer_group: str ) -> Consumer: """Get a connected Kafka consumer. """ consumer = Consumer( { "bootstrap.servers": kafka_server, "auto.offset.reset": "earliest", "enable.auto.commit": True, "group.id": kafka_consumer_group, } ) prefix = test_config["prefix"] kafka_topics = [ f"{prefix}.{object_type}" for object_type in test_config["object_types"] ] + [ f"{prefix}_privileged.{object_type}" for object_type in test_config["privileged_object_types"] ] consumer.subscribe(kafka_topics) yield consumer consumer.close() diff --git a/swh/journal/serializers.py b/swh/journal/serializers.py index 3dca255..8a6a798 100644 --- a/swh/journal/serializers.py +++ b/swh/journal/serializers.py @@ -1,172 +1,96 @@ # Copyright (C) 2016-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from typing import Any, Dict, Union, overload +from typing import Any, Union import msgpack from swh.core.api.serializers import msgpack_dumps, msgpack_loads -from swh.model.hashutil import DEFAULT_ALGORITHMS from swh.model.model import ( Content, Directory, + KeyType, MetadataAuthority, MetadataFetcher, Origin, OriginVisit, OriginVisitStatus, RawExtrinsicMetadata, Release, Revision, SkippedContent, Snapshot, ) ModelObject = Union[ Content, Directory, MetadataAuthority, MetadataFetcher, Origin, OriginVisit, OriginVisitStatus, RawExtrinsicMetadata, Release, Revision, SkippedContent, Snapshot, ] -KeyType = Union[Dict[str, str], Dict[str, bytes], bytes] - - -# these @overload'ed versions of the object_key method aim at helping mypy figuring -# the correct type-ing. -@overload -def object_key( - object_type: str, object_: Union[Content, Directory, Revision, Release, Snapshot] -) -> bytes: - ... - - -@overload -def object_key( - object_type: str, object_: Union[Origin, SkippedContent] -) -> Dict[str, bytes]: - ... - - -@overload -def object_key( - object_type: str, - object_: Union[ - MetadataAuthority, - MetadataFetcher, - OriginVisit, - OriginVisitStatus, - RawExtrinsicMetadata, - ], -) -> Dict[str, str]: - ... - - -def object_key(object_type: str, object_) -> KeyType: - if object_type in ("revision", "release", "directory", "snapshot"): - return object_.id - elif object_type == "content": - return object_.sha1 # TODO: use a dict of hashes - elif object_type == "skipped_content": - return {hash: getattr(object_, hash) for hash in DEFAULT_ALGORITHMS} - elif object_type == "origin": - return {"url": object_.url} - elif object_type == "origin_visit": - return { - "origin": object_.origin, - "date": str(object_.date), - } - elif object_type == "origin_visit_status": - return { - "origin": object_.origin, - "visit": str(object_.visit), - "date": str(object_.date), - } - elif object_type == "metadata_authority": - return { - "type": object_.type.value, - "url": object_.url, - } - elif object_type == "metadata_fetcher": - return { - "name": object_.name, - "version": object_.version, - } - elif object_type == "raw_extrinsic_metadata": - return { - "type": object_.type.value, - "id": str(object_.id), - "authority_type": object_.authority.type.value, - "authority_url": object_.authority.url, - "discovery_date": str(object_.discovery_date), - "fetcher_name": object_.fetcher.name, - "fetcher_version": object_.fetcher.version, - } - else: - raise ValueError("Unknown object type: %s." % object_type) - def stringify_key_item(k: str, v: Union[str, bytes]) -> str: """Turn the item of a dict key into a string""" if isinstance(v, str): return v if k == "url": return v.decode("utf-8") return v.hex() def pprint_key(key: KeyType) -> str: """Pretty-print a kafka key""" if isinstance(key, dict): return "{%s}" % ", ".join( f"{k}: {stringify_key_item(k, v)}" for k, v in key.items() ) elif isinstance(key, bytes): return key.hex() else: return key def key_to_kafka(key: KeyType) -> bytes: """Serialize a key, possibly a dict, in a predictable way""" p = msgpack.Packer(use_bin_type=True) if isinstance(key, dict): return p.pack_map_pairs(sorted(key.items())) else: return p.pack(key) def kafka_to_key(kafka_key: bytes) -> KeyType: """Deserialize a key""" return msgpack.loads(kafka_key, raw=False) def value_to_kafka(value: Any) -> bytes: """Serialize some data for storage in kafka""" return msgpack_dumps(value) def kafka_to_value(kafka_value: bytes) -> Any: """Deserialize some data stored in kafka""" value = msgpack_loads(kafka_value) return ensure_tuples(value) def ensure_tuples(value: Any) -> Any: if isinstance(value, (tuple, list)): return tuple(map(ensure_tuples, value)) elif isinstance(value, dict): return dict(ensure_tuples(list(value.items()))) else: return value diff --git a/swh/journal/tests/journal_data.py b/swh/journal/tests/journal_data.py index f891293..2d5fc57 100644 --- a/swh/journal/tests/journal_data.py +++ b/swh/journal/tests/journal_data.py @@ -1,341 +1,343 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime from typing import Dict, Sequence import attr from swh.journal.serializers import ModelObject from swh.model.hashutil import MultiHash, hash_to_bytes, hash_to_hex from swh.model.identifiers import SWHID from swh.model.model import ( Content, Directory, DirectoryEntry, MetadataAuthority, MetadataAuthorityType, MetadataFetcher, MetadataTargetType, ObjectType, Origin, OriginVisit, OriginVisitStatus, Person, RawExtrinsicMetadata, Release, Revision, RevisionType, SkippedContent, Snapshot, SnapshotBranch, TargetType, Timestamp, TimestampWithTimezone, ) UTC = datetime.timezone.utc CONTENTS = [ Content( length=4, data=f"foo{i}".encode(), status="visible", **MultiHash.from_data(f"foo{i}".encode()).digest(), ) for i in range(10) ] + [ Content( length=14, data=f"forbidden foo{i}".encode(), status="hidden", **MultiHash.from_data(f"forbidden foo{i}".encode()).digest(), ) for i in range(10) ] SKIPPED_CONTENTS = [ SkippedContent( length=4, status="absent", reason=f"because chr({i}) != '*'", **MultiHash.from_data(f"bar{i}".encode()).digest(), ) for i in range(2) ] duplicate_content1 = Content( length=4, sha1=hash_to_bytes("44973274ccef6ab4dfaaf86599792fa9c3fe4689"), sha1_git=b"another-foo", blake2s256=b"another-bar", sha256=b"another-baz", status="visible", ) # Craft a sha1 collision sha1_array = bytearray(duplicate_content1.sha1_git) sha1_array[0] += 1 duplicate_content2 = attr.evolve(duplicate_content1, sha1_git=bytes(sha1_array)) DUPLICATE_CONTENTS = [duplicate_content1, duplicate_content2] COMMITTERS = [ Person(fullname=b"foo", name=b"foo", email=b""), Person(fullname=b"bar", name=b"bar", email=b""), ] DATES = [ TimestampWithTimezone( timestamp=Timestamp(seconds=1234567891, microseconds=0,), offset=120, negative_utc=False, ), TimestampWithTimezone( timestamp=Timestamp(seconds=1234567892, microseconds=0,), offset=120, negative_utc=False, ), ] REVISIONS = [ Revision( id=hash_to_bytes("4ca486e65eb68e4986aeef8227d2db1d56ce51b3"), message=b"hello", date=DATES[0], committer=COMMITTERS[0], author=COMMITTERS[0], committer_date=DATES[0], type=RevisionType.GIT, directory=b"\x01" * 20, synthetic=False, metadata=None, parents=(), ), Revision( id=hash_to_bytes("677063f5c405d6fc1781fc56379c9a9adf43d3a0"), message=b"hello again", date=DATES[1], committer=COMMITTERS[1], author=COMMITTERS[1], committer_date=DATES[1], type=RevisionType.MERCURIAL, directory=b"\x02" * 20, synthetic=False, metadata=None, parents=(), extra_headers=((b"foo", b"bar"),), ), ] RELEASES = [ Release( id=hash_to_bytes("8059dc4e17fcd0e51ca3bcd6b80f4577d281fd08"), name=b"v0.0.1", date=TimestampWithTimezone( timestamp=Timestamp(seconds=1234567890, microseconds=0,), offset=120, negative_utc=False, ), author=COMMITTERS[0], target_type=ObjectType.REVISION, target=b"\x04" * 20, message=b"foo", synthetic=False, ), ] ORIGINS = [ Origin(url="https://somewhere.org/den/fox",), Origin(url="https://overtherainbow.org/fox/den",), ] ORIGIN_VISITS = [ OriginVisit( origin=ORIGINS[0].url, date=datetime.datetime(2013, 5, 7, 4, 20, 39, 369271, tzinfo=UTC), visit=1, type="git", ), OriginVisit( origin=ORIGINS[1].url, date=datetime.datetime(2014, 11, 27, 17, 20, 39, tzinfo=UTC), visit=1, type="hg", ), OriginVisit( origin=ORIGINS[0].url, date=datetime.datetime(2018, 11, 27, 17, 20, 39, tzinfo=UTC), visit=2, type="git", ), OriginVisit( origin=ORIGINS[0].url, date=datetime.datetime(2018, 11, 27, 17, 20, 39, tzinfo=UTC), visit=3, type="git", ), OriginVisit( origin=ORIGINS[1].url, date=datetime.datetime(2015, 11, 27, 17, 20, 39, tzinfo=UTC), visit=2, type="hg", ), ] # The origin-visit-status dates needs to be shifted slightly in the future from their # visit dates counterpart. Otherwise, we are hitting storage-wise the "on conflict" # ignore policy (because origin-visit-add creates an origin-visit-status with the same # parameters from the origin-visit {origin, visit, date}... ORIGIN_VISIT_STATUSES = [ OriginVisitStatus( origin=ORIGINS[0].url, date=datetime.datetime(2013, 5, 7, 4, 20, 39, 432222, tzinfo=UTC), visit=1, status="ongoing", snapshot=None, metadata=None, ), OriginVisitStatus( origin=ORIGINS[1].url, date=datetime.datetime(2014, 11, 27, 17, 21, 12, tzinfo=UTC), visit=1, status="ongoing", snapshot=None, metadata=None, ), OriginVisitStatus( origin=ORIGINS[0].url, date=datetime.datetime(2018, 11, 27, 17, 20, 59, tzinfo=UTC), visit=2, status="ongoing", snapshot=None, metadata=None, ), OriginVisitStatus( origin=ORIGINS[0].url, date=datetime.datetime(2018, 11, 27, 17, 20, 49, tzinfo=UTC), visit=3, status="full", snapshot=hash_to_bytes("17d0066a4a80aba4a0e913532ee8ff2014f006a9"), metadata=None, ), OriginVisitStatus( origin=ORIGINS[1].url, date=datetime.datetime(2015, 11, 27, 17, 22, 18, tzinfo=UTC), visit=2, status="partial", snapshot=hash_to_bytes("8ce268b87faf03850693673c3eb5c9bb66e1ca38"), metadata=None, ), ] DIRECTORIES = [ Directory(id=hash_to_bytes("4b825dc642cb6eb9a060e54bf8d69288fbee4904"), entries=()), Directory( id=hash_to_bytes("21416d920e0ebf0df4a7888bed432873ed5cb3a7"), entries=( DirectoryEntry( name=b"file1.ext", perms=0o644, type="file", target=CONTENTS[0].sha1_git, ), DirectoryEntry( name=b"dir1", perms=0o755, type="dir", target=hash_to_bytes("4b825dc642cb6eb9a060e54bf8d69288fbee4904"), ), DirectoryEntry( name=b"subprepo1", perms=0o160000, type="rev", target=REVISIONS[1].id, ), ), ), ] SNAPSHOTS = [ Snapshot( id=hash_to_bytes("17d0066a4a80aba4a0e913532ee8ff2014f006a9"), branches={ b"master": SnapshotBranch( target_type=TargetType.REVISION, target=REVISIONS[0].id ) }, ), Snapshot( id=hash_to_bytes("8ce268b87faf03850693673c3eb5c9bb66e1ca38"), branches={ b"target/revision": SnapshotBranch( target_type=TargetType.REVISION, target=REVISIONS[0].id, ), b"target/alias": SnapshotBranch( target_type=TargetType.ALIAS, target=b"target/revision" ), b"target/directory": SnapshotBranch( target_type=TargetType.DIRECTORY, target=DIRECTORIES[0].id, ), b"target/release": SnapshotBranch( target_type=TargetType.RELEASE, target=RELEASES[0].id ), b"target/snapshot": SnapshotBranch( target_type=TargetType.SNAPSHOT, target=hash_to_bytes("17d0066a4a80aba4a0e913532ee8ff2014f006a9"), ), }, ), ] METADATA_AUTHORITIES = [ MetadataAuthority( type=MetadataAuthorityType.FORGE, url="http://example.org/", metadata={}, ), ] METADATA_FETCHERS = [ MetadataFetcher(name="test-fetcher", version="1.0.0", metadata={},) ] RAW_EXTRINSIC_METADATA = [ RawExtrinsicMetadata( type=MetadataTargetType.ORIGIN, - id="http://example.org/foo.git", + target="http://example.org/foo.git", discovery_date=datetime.datetime(2020, 7, 30, 17, 8, 20, tzinfo=UTC), authority=attr.evolve(METADATA_AUTHORITIES[0], metadata=None), fetcher=attr.evolve(METADATA_FETCHERS[0], metadata=None), format="json", metadata=b'{"foo": "bar"}', ), RawExtrinsicMetadata( type=MetadataTargetType.CONTENT, - id=SWHID(object_type="content", object_id=hash_to_hex(CONTENTS[0].sha1_git)), + target=SWHID( + object_type="content", object_id=hash_to_hex(CONTENTS[0].sha1_git) + ), discovery_date=datetime.datetime(2020, 7, 30, 17, 8, 20, tzinfo=UTC), authority=attr.evolve(METADATA_AUTHORITIES[0], metadata=None), fetcher=attr.evolve(METADATA_FETCHERS[0], metadata=None), format="json", metadata=b'{"foo": "bar"}', ), ] TEST_OBJECTS: Dict[str, Sequence[ModelObject]] = { "content": CONTENTS, "directory": DIRECTORIES, "metadata_authority": METADATA_AUTHORITIES, "metadata_fetcher": METADATA_FETCHERS, "origin": ORIGINS, "origin_visit": ORIGIN_VISITS, "origin_visit_status": ORIGIN_VISIT_STATUSES, "raw_extrinsic_metadata": RAW_EXTRINSIC_METADATA, "release": RELEASES, "revision": REVISIONS, "snapshot": SNAPSHOTS, "skipped_content": SKIPPED_CONTENTS, } diff --git a/swh/journal/tests/test_serializers.py b/swh/journal/tests/test_serializers.py index 4cbe236..f2b9671 100644 --- a/swh/journal/tests/test_serializers.py +++ b/swh/journal/tests/test_serializers.py @@ -1,76 +1,69 @@ # Copyright (C) 2017-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import OrderedDict import itertools from typing import Iterable from swh.journal import serializers from .conftest import TEST_OBJECTS def test_key_to_kafka_repeatable(): """Check the kafka key encoding is repeatable""" base_dict = { "a": "foo", "b": "bar", "c": "baz", } key = serializers.key_to_kafka(base_dict) for dict_keys in itertools.permutations(base_dict): d = OrderedDict() for k in dict_keys: d[k] = base_dict[k] assert key == serializers.key_to_kafka(d) -def test_get_key(): - """Test whether get_key works on all our objects""" - for object_type, objects in TEST_OBJECTS.items(): - for obj in objects: - assert serializers.object_key(object_type, obj) is not None - - def test_pprint_key(): """Test whether get_key works on all our objects""" for object_type, objects in TEST_OBJECTS.items(): for obj in objects: - key = serializers.object_key(object_type, obj) + key = obj.unique_key() pprinted_key = serializers.pprint_key(key) assert isinstance(pprinted_key, str) if isinstance(key, dict): assert pprinted_key[0], pprinted_key[-1] == "{}" for dict_key in key.keys(): assert f"{dict_key}:" in pprinted_key if isinstance(key, bytes): assert pprinted_key == key.hex() def test_kafka_to_key(): """Standard back and forth serialization with keys """ # All KeyType(s) keys: Iterable[serializers.KeyType] = [ {"a": "foo", "b": "bar", "c": "baz",}, {"a": b"foobarbaz",}, b"foo", ] for object_type, objects in TEST_OBJECTS.items(): for obj in objects: - key = serializers.object_key(object_type, obj) + key = obj.unique_key() keys.append(key) for key in keys: ktk = serializers.key_to_kafka(key) v = serializers.kafka_to_key(ktk) assert v == key diff --git a/swh/journal/writer/inmemory.py b/swh/journal/writer/inmemory.py index 9c81b6a..2c70d67 100644 --- a/swh/journal/writer/inmemory.py +++ b/swh/journal/writer/inmemory.py @@ -1,39 +1,38 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging from multiprocessing import Manager from typing import List +from swh.journal.serializers import ModelObject from swh.model.model import BaseModel -from .kafka import ModelObject - logger = logging.getLogger(__name__) class InMemoryJournalWriter: def __init__(self): # Share the list of objects across processes, for RemoteAPI tests. self.manager = Manager() self.objects = self.manager.list() self.privileged_objects = self.manager.list() def write_addition( self, object_type: str, object_: ModelObject, privileged: bool = False ) -> None: assert isinstance(object_, BaseModel) if privileged: self.privileged_objects.append((object_type, object_)) else: self.objects.append((object_type, object_)) write_update = write_addition def write_additions( self, object_type: str, objects: List[ModelObject], privileged: bool = False ) -> None: for object_ in objects: self.write_addition(object_type, object_, privileged) diff --git a/swh/journal/writer/kafka.py b/swh/journal/writer/kafka.py index e5dc547..25e9408 100644 --- a/swh/journal/writer/kafka.py +++ b/swh/journal/writer/kafka.py @@ -1,239 +1,238 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging import time from typing import Dict, Iterable, List, NamedTuple, Optional, Type from confluent_kafka import KafkaException, Producer from swh.journal.serializers import ( KeyType, ModelObject, key_to_kafka, - object_key, pprint_key, value_to_kafka, ) logger = logging.getLogger(__name__) class DeliveryTag(NamedTuple): """Unique tag allowing us to check for a message delivery""" topic: str kafka_key: bytes class DeliveryFailureInfo(NamedTuple): """Verbose information for failed deliveries""" object_type: str key: KeyType message: str code: str def get_object_type(topic: str) -> str: """Get the object type from a topic string""" return topic.rsplit(".", 1)[-1] class KafkaDeliveryError(Exception): """Delivery failed on some kafka messages.""" def __init__(self, message: str, delivery_failures: Iterable[DeliveryFailureInfo]): self.message = message self.delivery_failures = list(delivery_failures) def pretty_failures(self) -> str: return ", ".join( f"{f.object_type} {pprint_key(f.key)} ({f.message})" for f in self.delivery_failures ) def __str__(self): return f"KafkaDeliveryError({self.message}, [{self.pretty_failures()}])" class KafkaJournalWriter: """This class is used to write serialized versions of swh.model.model objects to a series of Kafka topics. Topics used to send objects representations are built from a ``prefix`` plus the type of the object: ``{prefix}.{object_type}`` Objects can be sent as is, or can be anonymized. The anonymization feature, when activated, will write anonymized versions of model objects in the main topic, and stock (non-anonymized) objects will be sent to a dedicated (privileged) set of topics: ``{prefix}_privileged.{object_type}`` The anonymization of a swh.model object is the result of calling its ``BaseModel.anonymize()`` method. An object is considered anonymizable if this method returns a (non-None) value. Args: brokers: list of broker addresses and ports. prefix: the prefix used to build the topic names for objects. client_id: the id of the writer sent to kafka. producer_config: extra configuration keys passed to the `Producer`. flush_timeout: timeout, in seconds, after which the `flush` operation will fail if some message deliveries are still pending. producer_class: override for the kafka producer class. anonymize: if True, activate the anonymization feature. """ def __init__( self, brokers: Iterable[str], prefix: str, client_id: str, producer_config: Optional[Dict] = None, flush_timeout: float = 120, producer_class: Type[Producer] = Producer, anonymize: bool = False, ): self._prefix = prefix self._prefix_privileged = f"{self._prefix}_privileged" self.anonymize = anonymize if not producer_config: producer_config = {} if "message.max.bytes" not in producer_config: producer_config = { "message.max.bytes": 100 * 1024 * 1024, **producer_config, } self.producer = producer_class( { "bootstrap.servers": ",".join(brokers), "client.id": client_id, "on_delivery": self._on_delivery, "error_cb": self._error_cb, "logger": logger, "acks": "all", **producer_config, } ) # Delivery management self.flush_timeout = flush_timeout # delivery tag -> original object "key" mapping self.deliveries_pending: Dict[DeliveryTag, KeyType] = {} # List of (object_type, key, error_msg, error_name) for failed deliveries self.delivery_failures: List[DeliveryFailureInfo] = [] def _error_cb(self, error): if error.fatal(): raise KafkaException(error) logger.info("Received non-fatal kafka error: %s", error) def _on_delivery(self, error, message): (topic, key) = delivery_tag = DeliveryTag(message.topic(), message.key()) sent_key = self.deliveries_pending.pop(delivery_tag, None) if error is not None: self.delivery_failures.append( DeliveryFailureInfo( get_object_type(topic), sent_key, error.str(), error.name() ) ) def send(self, topic: str, key: KeyType, value): kafka_key = key_to_kafka(key) self.producer.produce( topic=topic, key=kafka_key, value=value_to_kafka(value), ) self.deliveries_pending[DeliveryTag(topic, kafka_key)] = key def delivery_error(self, message) -> KafkaDeliveryError: """Get all failed deliveries, and clear them""" ret = self.delivery_failures self.delivery_failures = [] while self.deliveries_pending: delivery_tag, orig_key = self.deliveries_pending.popitem() (topic, kafka_key) = delivery_tag ret.append( DeliveryFailureInfo( get_object_type(topic), orig_key, "No delivery before flush() timeout", "SWH_FLUSH_TIMEOUT", ) ) return KafkaDeliveryError(message, ret) def flush(self): start = time.monotonic() self.producer.flush(self.flush_timeout) while self.deliveries_pending: if time.monotonic() - start > self.flush_timeout: break self.producer.poll(0.1) if self.deliveries_pending: # Delivery timeout raise self.delivery_error( "flush() exceeded timeout (%ss)" % self.flush_timeout, ) elif self.delivery_failures: raise self.delivery_error("Failed deliveries after flush()") def _sanitize_object( self, object_type: str, object_: ModelObject ) -> Dict[str, str]: dict_ = object_.to_dict() if object_type == "content": dict_.pop("data", None) return dict_ def _write_addition(self, object_type: str, object_: ModelObject) -> None: """Write a single object to the journal""" - key = object_key(object_type, object_) + key = object_.unique_key() if self.anonymize: anon_object_ = object_.anonymize() if anon_object_: # can be either None, or an anonymized object # if the object is anonymizable, send the non-anonymized version in the # privileged channel topic = f"{self._prefix_privileged}.{object_type}" dict_ = self._sanitize_object(object_type, object_) logger.debug("topic: %s, key: %s, value: %s", topic, key, dict_) self.send(topic, key=key, value=dict_) object_ = anon_object_ topic = f"{self._prefix}.{object_type}" dict_ = self._sanitize_object(object_type, object_) logger.debug("topic: %s, key: %s, value: %s", topic, key, dict_) self.send(topic, key=key, value=dict_) def write_addition(self, object_type: str, object_: ModelObject) -> None: """Write a single object to the journal""" self._write_addition(object_type, object_) self.flush() write_update = write_addition def write_additions(self, object_type: str, objects: Iterable[ModelObject]) -> None: """Write a set of objects to the journal""" for object_ in objects: self._write_addition(object_type, object_) self.flush() diff --git a/tox.ini b/tox.ini index 9020b04..bcd7b3f 100644 --- a/tox.ini +++ b/tox.ini @@ -1,37 +1,37 @@ [tox] envlist=black,flake8,mypy,py3 [testenv] extras = testing deps = pytest-cov dev: pdbpp commands = pytest --cov={envsitepackagesdir}/swh/journal \ {envsitepackagesdir}/swh/journal \ --cov-branch \ --doctest-modules {posargs} [testenv:black] skip_install = true deps = - black + black==19.10b0 commands = {envpython} -m black --check swh [testenv:flake8] skip_install = true deps = git+https://github.com/PyCQA/pyflakes.git flake8 commands = {envpython} -m flake8 [testenv:mypy] extras = testing deps = mypy commands = mypy swh