diff --git a/PKG-INFO b/PKG-INFO index df08fb4..4ee5d77 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,72 +1,72 @@ Metadata-Version: 2.1 Name: swh.journal -Version: 0.4.0 +Version: 0.4.1 Summary: Software Heritage Journal utilities Home-page: https://forge.softwareheritage.org/diffusion/DJNL/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh-journal Project-URL: Documentation, https://docs.softwareheritage.org/devel/swh-journal/ Description: swh-journal =========== Persistent logger of changes to the archive, with publish-subscribe support. See the [documentation](https://docs.softwareheritage.org/devel/swh-journal/index.html#software-heritage-journal) for more details. # Local test As a pre-requisite, you need a kakfa installation path. The following target will take care of this: ``` make install ``` Then, provided you are in the right virtual environment as described in the [swh getting-started](https://docs.softwareheritage.org/devel/developer-setup.html#developer-setup): ``` pytest ``` or: ``` tox ``` # Running ## publisher Command: ``` $ swh-journal --config-file ~/.config/swh/journal/publisher.yml \ publisher ``` # Auto-completion To have the completion, add the following in your ~/.virtualenvs/swh/bin/postactivate: ``` eval "$(_SWH_JOURNAL_COMPLETE=$autocomplete_cmd swh-journal)" ``` Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Requires-Python: >=3.7 Description-Content-Type: text/markdown Provides-Extra: testing diff --git a/debian/changelog b/debian/changelog index cd7162c..236beee 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,397 +1,401 @@ -swh-journal (0.4.0-1~swh1~bpo10+1) buster-swh; urgency=medium +swh-journal (0.4.1-1~swh1) unstable-swh; urgency=medium - * Rebuild for buster-swh + * New upstream release 0.4.1 - (tagged by Valentin Lorentz + on 2020-07-31 11:25:19 +0200) + * Upstream changes: - v0.4.1 - * Remove TEST_OBJECT_DICTS, use + only TEST_OBJECTS. - * Add support for MetadataAuthority, + MetadataFetcher, and RawExtrinsicMetadata. - -- Software Heritage autobuilder (on jenkins-debian1) Mon, 06 Jul 2020 11:59:16 +0000 + -- Software Heritage autobuilder (on jenkins-debian1) Fri, 31 Jul 2020 09:31:32 +0000 swh-journal (0.4.0-1~swh1) unstable-swh; urgency=medium * New upstream release 0.4.0 - (tagged by David Douard on 2020-07-06 13:49:40 +0200) * Upstream changes: - v0.4.0 -- Software Heritage autobuilder (on jenkins-debian1) Mon, 06 Jul 2020 11:58:06 +0000 swh-journal (0.3.5-1~swh1) unstable-swh; urgency=medium * New upstream release 0.3.5 - (tagged by Antoine R. Dumont (@ardumont) on 2020-07-01 15:55:31 +0200) * Upstream changes: - v0.3.5 - journal_data: Drop obsolete origin_visit fields - Use proper hash ids in tests' journal_data -- Software Heritage autobuilder (on jenkins-debian1) Wed, 01 Jul 2020 14:00:27 +0000 swh-journal (0.3.4-1~swh1) unstable-swh; urgency=medium * New upstream release 0.3.4 - (tagged by Antoine R. Dumont (@ardumont) on 2020-06-25 10:10:16 +0200) * Upstream changes: - v0.3.4 - Drop datetime conversion indirection -- Software Heritage autobuilder (on jenkins-debian1) Thu, 25 Jun 2020 08:13:02 +0000 swh-journal (0.3.3-1~swh1) unstable-swh; urgency=medium * New upstream release 0.3.3 - (tagged by Antoine R. Dumont (@ardumont) on 2020-06-25 09:34:32 +0200) * Upstream changes: - v0.3.3 - journal_data: Make origin-visit optional fields to None -- Software Heritage autobuilder (on jenkins-debian1) Thu, 25 Jun 2020 07:36:28 +0000 swh-journal (0.3.2-1~swh1) unstable-swh; urgency=medium * New upstream release 0.3.2 - (tagged by David Douard on 2020-06-17 09:29:44 +0200) * Upstream changes: - v0.3.2 -- Software Heritage autobuilder (on jenkins-debian1) Wed, 17 Jun 2020 07:37:10 +0000 swh-journal (0.3.1-1~swh1) unstable-swh; urgency=medium * New upstream release 0.3.1 - (tagged by Antoine R. Dumont (@ardumont) on 2020-06-10 10:54:44 +0200) * Upstream changes: - v0.3.1 - pytest_plugin: pprint key when assertion is not respected -- Software Heritage autobuilder (on jenkins-debian1) Wed, 10 Jun 2020 08:56:57 +0000 swh-journal (0.3.0-1~swh1) unstable-swh; urgency=medium * New upstream release 0.3.0 - (tagged by Antoine R. Dumont (@ardumont) on 2020-06-09 10:42:30 +0200) * Upstream changes: - v0.3.0 - Allow journal to deal with origin_visit_status - test: Use origin-visit date field as datetime to phase out iso8601 str -- Software Heritage autobuilder (on jenkins-debian1) Tue, 09 Jun 2020 08:44:44 +0000 swh-journal (0.2.0-1~swh1) unstable-swh; urgency=medium * New upstream release 0.2.0 - (tagged by David Douard on 2020-06-03 13:50:54 +0200) * Upstream changes: - v0.2.0 -- Software Heritage autobuilder (on jenkins-debian1) Wed, 03 Jun 2020 12:00:31 +0000 swh-journal (0.1.0-1~swh1) unstable-swh; urgency=medium * New upstream release 0.1.0 - (tagged by David Douard on 2020-05-07 10:33:18 +0200) * Upstream changes: - v0.1.0 -- Software Heritage autobuilder (on jenkins-debian1) Thu, 07 May 2020 08:35:49 +0000 swh-journal (0.0.32-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.32 - (tagged by Antoine R. Dumont (@ardumont) on 2020-05-04 18:02:45 +0200) * Upstream changes: - v0.0.32 - serializers: Make kafka_to_key implem compatible with stable version - setup.py: add documentation link - Remove the content replayer code - Remove the backfiller and the (storage) replayer -- Software Heritage autobuilder (on jenkins-debian1) Mon, 04 May 2020 16:04:38 +0000 swh-journal (0.0.31-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.31 - (tagged by David Douard on 2020-04-23 12:27:47 +0200) * Upstream changes: - v0.0.31 -- Software Heritage autobuilder (on jenkins-debian1) Thu, 23 Apr 2020 10:37:28 +0000 swh-journal (0.0.30-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.30 - (tagged by Nicolas Dandrimont on 2020-04-14 15:57:34 +0200) * Upstream changes: - Release swh.journal v0.0.30 - various test refactorings - enable black on the source code - accept swh.model objects in the journal writers - listen to delivery notifications after writing messages - default to message.max.bytes = 100 MB in the kafka writer -- Software Heritage autobuilder (on jenkins-debian1) Tue, 14 Apr 2020 14:10:54 +0000 swh-journal (0.0.29-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.29 - (tagged by Antoine R. Dumont (@ardumont) on 2020-03-27 10:40:46 +0100) * Upstream changes: - v0.0.29 - replayer: Allow legacy origin to be replayed - tests.utils: Test from modules using this need to call close method - tests: Adapt model according to latest change -- Software Heritage autobuilder (on jenkins-debian1) Fri, 27 Mar 2020 09:51:08 +0000 swh-journal (0.0.28-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.28 - (tagged by Antoine R. Dumont (@ardumont) on 2020-03-25 10:14:47 +0100) * Upstream changes: - v0.0.28 - journal.replay: Migrate to latest HashCollision change - replayer: factor out legacy objects fixers - journal.replay: Inline `_fix_origin_visit` for loop in insert_object - journal.replay: Align _fix_content with other fix methods - journal.replay: Align fix revision behavior to other fix methods - Remove extra 'perms' key of contents when replaying. -- Software Heritage autobuilder (on jenkins-debian1) Wed, 25 Mar 2020 09:23:55 +0000 swh-journal (0.0.27-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.27 - (tagged by Antoine R. Dumont (@ardumont) on 2020-03-16 16:09:33 +0100) * Upstream changes: - v0.0.27 - Migrate to latest origin_visit_upsert/add api changes - journal: Use swh-model objects instead of dicts in replay and writer - replay: Filter out colliding contents when replaying - Use better kafka producer semantics in the journal writers - Make the number of messages processed at a time by journal clients - configurable - Drop deprecated cli options - Replace deprecated options with a config file override in cli tests - Clean up the signature of test_cli's invoke method - Migrate test cli config to a dict instead of raw yaml - kafka: normalize KafkaJournalWriter.write_addition[s] API - Rename JournalClient.max_messages to JournalClient.stop_after_objects - Be more careful with content generation in test_write_replay - Add type annotations to swh.journal.client arguments - Unify tense for content replay statistics log entry - Add missing log4j.properties file from MANIFEST.in - Unify retry/error handling for content replay -- Software Heritage autobuilder (on jenkins-debian1) Mon, 16 Mar 2020 15:17:43 +0000 swh-journal (0.0.26-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.26 - (tagged by David Douard on 2020-03-06 15:36:23 +0100) * Upstream changes: - v0.0.26 -- Software Heritage autobuilder (on jenkins-debian1) Fri, 06 Mar 2020 14:47:18 +0000 swh-journal (0.0.25-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.25 - (tagged by Valentin Lorentz on 2020-01-30 15:35:41 +0100) * Upstream changes: - v0.0.25 - * Add support for swh-storage v0.0.168. - * Accept None dates when validating revisions. -- Software Heritage autobuilder (on jenkins-debian1) Thu, 30 Jan 2020 14:43:25 +0000 swh-journal (0.0.24-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.24 - (tagged by Antoine Lambert on 2020-01-06 16:14:32 +0100) * Upstream changes: - version 0.0.24 -- Software Heritage autobuilder (on jenkins-debian1) Mon, 06 Jan 2020 15:20:54 +0000 swh-journal (0.0.23-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.23 - (tagged by Nicolas Dandrimont on 2020-01-03 20:02:39 +0100) * Upstream changes: - Release swh.journal v0.0.23 - support short-hand syntax for initializing the journal direct writer -- Software Heritage autobuilder (on jenkins-debian1) Fri, 03 Jan 2020 19:06:31 +0000 swh-journal (0.0.21-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.21 - (tagged by David Douard on 2019-11-29 15:37:49 +0100) * Upstream changes: - v0.0.21 -- Software Heritage autobuilder (on jenkins-debian1) Mon, 02 Dec 2019 14:12:49 +0000 swh-journal (0.0.20-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.20 - (tagged by David Douard on 2019-11-29 12:06:13 +0100) * Upstream changes: - v0.0.20 -- Software Heritage autobuilder (on jenkins-debian1) Fri, 29 Nov 2019 11:10:44 +0000 swh-journal (0.0.19-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.19 - (tagged by Nicolas Dandrimont on 2019-11-07 14:33:58 +0100) * Upstream changes: - Release swh.journal v0.0.19 - Merge several reliability fixes -- Software Heritage autobuilder (on jenkins-debian1) Thu, 07 Nov 2019 13:37:23 +0000 swh-journal (0.0.18-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.18 - (tagged by Stefano Zacchiroli on 2019-10-01 10:19:26 +0200) * Upstream changes: - v0.0.18 - * tox: anticipate mypy run to just after flake8 - * init.py: switch to documented way of extending path - * typing: minimal changes to make a no-op mypy run pass - * writer: Normalize 'cls' value to 'memory' for in- memory instantiation - * Add a test directly for the journal client. -- Software Heritage autobuilder (on jenkins-debian1) Tue, 01 Oct 2019 11:12:03 +0000 swh-journal (0.0.17-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.17 - (tagged by Nicolas Dandrimont on 2019-09-18 18:01:50 +0200) * Upstream changes: - Release swh.journal v0.0.17 - Cleanup fallout from confluent_kafka migration - Better error handling and logging in direct_writer - Backfiller fixups - More extensive mock for KafkaConsumer -- Software Heritage autobuilder (on jenkins-debian1) Wed, 18 Sep 2019 16:08:20 +0000 swh-journal (0.0.16-1~swh2) unstable-swh; urgency=medium * Migrate to confluent-kafka -- Nicolas Dandrimont Fri, 13 Sep 2019 20:11:24 +0200 swh-journal (0.0.16-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.16 - (tagged by Nicolas Dandrimont on 2019-09-13 14:28:50 +0200) * Upstream changes: - Release swh.journal v0.0.16: - Migrate to confluent-kafka from python-kafka -- Software Heritage autobuilder (on jenkins-debian1) Fri, 13 Sep 2019 12:34:35 +0000 swh-journal (0.0.15-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.15 - (tagged by David Douard on 2019-09-10 16:50:42 +0200) * Upstream changes: - v0.0.15 -- Software Heritage autobuilder (on jenkins-debian1) Tue, 10 Sep 2019 14:53:55 +0000 swh-journal (0.0.14-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.14 - (tagged by David Douard on 2019-07-18 13:38:39 +0200) * Upstream changes: - 0.0.14 - Code of conduct - fix the backfiller - fix compatibility with click < 7 - make the replayer robust against old formats -- Software Heritage autobuilder (on jenkins-debian1) Thu, 18 Jul 2019 11:44:40 +0000 swh-journal (0.0.13-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.13 - (tagged by Antoine R. Dumont (@ardumont) on 2019-07-03 10:26:29 +0200) * Upstream changes: - v0.0.13 - cli: Document depreated options -- Software Heritage autobuilder (on jenkins-debian1) Wed, 03 Jul 2019 08:33:57 +0000 swh-journal (0.0.12-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.12 - (tagged by Valentin Lorentz on 2019-07-02 11:58:00 +0200) * Upstream changes: - v0.0.12 - More CLI option - Replay parallelism - Fix build on Debian 9 -- Software Heritage autobuilder (on jenkins-debian1) Tue, 02 Jul 2019 10:08:07 +0000 swh-journal (0.0.11-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.11 - (tagged by David Douard on 2019-06-12 13:58:14 +0200) * Upstream changes: - v0.0.11 -- Software Heritage autobuilder (on jenkins-debian1) Wed, 12 Jun 2019 12:10:52 +0000 swh-journal (0.0.10-1~swh2) unstable-swh; urgency=medium * Disable tests at build-time -- Nicolas Dandrimont Thu, 09 May 2019 14:42:24 +0200 swh-journal (0.0.10-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.10 - (tagged by Nicolas Dandrimont on 2019-05-09 14:29:52 +0200) * Upstream changes: - Release swh.journal v0.0.10 - Remove the publisher component, introduce the backfiller component. -- Software Heritage autobuilder (on jenkins-debian1) Thu, 09 May 2019 12:34:36 +0000 swh-journal (0.0.9-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.9 - (tagged by David Douard on 2019-04-10 13:42:32 +0200) * Upstream changes: - v0.0.9 -- Software Heritage autobuilder (on jenkins-debian1) Wed, 10 Apr 2019 11:48:39 +0000 swh-journal (0.0.8-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.8 - (tagged by Antoine R. Dumont (@ardumont) on 2019-03-15 13:56:57 +0100) * Upstream changes: - v0.0.8 - Add swh-journal cli -- Software Heritage autobuilder (on jenkins-debian1) Fri, 15 Mar 2019 13:00:11 +0000 swh-journal (0.0.7-1~swh2) unstable-swh; urgency=low * New release fixing build dependencies -- Antoine Romain Dumont (@ardumont) Tue, 19 Feb 2019 14:18:06 +0100 swh-journal (0.0.7-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.7 - (tagged by Antoine R. Dumont (@ardumont) on 2019-01-11 11:53:44 +0100) * Upstream changes: - v0.0.7 - Fix off-by-one error when checking max_messages. - tests: Adapt tests according to latest in-memory storage changes -- Software Heritage autobuilder (on jenkins-debian1) Fri, 11 Jan 2019 10:56:46 +0000 swh-journal (0.0.4-1~swh1) unstable-swh; urgency=medium * Release swh.journal version 0.0.4 * Update packaging runes -- Nicolas Dandrimont Thu, 12 Oct 2017 19:01:53 +0200 swh-journal (0.0.3-1~swh1) unstable-swh; urgency=medium * Release swh.journal v0.0.3 * Prepare building for stretch -- Nicolas Dandrimont Fri, 30 Jun 2017 17:29:15 +0200 swh-journal (0.0.2-1~swh1) unstable-swh; urgency=medium * v0.0.2 * Adapt swh.journal.publisher * Adapt swh.journal.client * Add swh.journal.checker basic implementation (reads and sends all * objects to publisher's subscribed queues). -- Antoine R. Dumont (@ardumont) Fri, 24 Mar 2017 12:54:16 +0100 swh-journal (0.0.1-1~swh1) unstable-swh; urgency=medium * Initial release * v0.0.1 * Add a journal publisher * Add a base class interface for journal clients -- Antoine R. Dumont (@ardumont) Tue, 21 Mar 2017 14:38:13 +0100 diff --git a/requirements-swh.txt b/requirements-swh.txt index 34619f9..2c6dabc 100644 --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,2 +1,2 @@ swh.core[db,http] >= 0.0.60 -swh.model >= 0.3.6 +swh.model >= 0.6.1 diff --git a/swh.journal.egg-info/PKG-INFO b/swh.journal.egg-info/PKG-INFO index df08fb4..4ee5d77 100644 --- a/swh.journal.egg-info/PKG-INFO +++ b/swh.journal.egg-info/PKG-INFO @@ -1,72 +1,72 @@ Metadata-Version: 2.1 Name: swh.journal -Version: 0.4.0 +Version: 0.4.1 Summary: Software Heritage Journal utilities Home-page: https://forge.softwareheritage.org/diffusion/DJNL/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh-journal Project-URL: Documentation, https://docs.softwareheritage.org/devel/swh-journal/ Description: swh-journal =========== Persistent logger of changes to the archive, with publish-subscribe support. See the [documentation](https://docs.softwareheritage.org/devel/swh-journal/index.html#software-heritage-journal) for more details. # Local test As a pre-requisite, you need a kakfa installation path. The following target will take care of this: ``` make install ``` Then, provided you are in the right virtual environment as described in the [swh getting-started](https://docs.softwareheritage.org/devel/developer-setup.html#developer-setup): ``` pytest ``` or: ``` tox ``` # Running ## publisher Command: ``` $ swh-journal --config-file ~/.config/swh/journal/publisher.yml \ publisher ``` # Auto-completion To have the completion, add the following in your ~/.virtualenvs/swh/bin/postactivate: ``` eval "$(_SWH_JOURNAL_COMPLETE=$autocomplete_cmd swh-journal)" ``` Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Requires-Python: >=3.7 Description-Content-Type: text/markdown Provides-Extra: testing diff --git a/swh.journal.egg-info/requires.txt b/swh.journal.egg-info/requires.txt index ab6ffea..e10e2e0 100644 --- a/swh.journal.egg-info/requires.txt +++ b/swh.journal.egg-info/requires.txt @@ -1,10 +1,10 @@ confluent-kafka msgpack tenacity vcversioner swh.core[db,http]>=0.0.60 -swh.model>=0.3.6 +swh.model>=0.6.1 [testing] pytest hypothesis diff --git a/swh/journal/pytest_plugin.py b/swh/journal/pytest_plugin.py index 36af3a1..a546bbb 100644 --- a/swh/journal/pytest_plugin.py +++ b/swh/journal/pytest_plugin.py @@ -1,246 +1,241 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import random import string from typing import Collection, Dict, Iterator, Optional from collections import defaultdict import attr import pytest from confluent_kafka import Consumer, KafkaException, Producer from confluent_kafka.admin import AdminClient from swh.journal.serializers import object_key, kafka_to_key, kafka_to_value, pprint_key -from swh.journal.tests.journal_data import ( - TEST_OBJECTS, - TEST_OBJECT_DICTS, - MODEL_OBJECTS, -) +from swh.journal.tests.journal_data import TEST_OBJECTS def consume_messages(consumer, kafka_prefix, expected_messages): """Consume expected_messages from the consumer; Sort them all into a consumed_objects dict""" consumed_messages = defaultdict(list) fetched_messages = 0 retries_left = 1000 while fetched_messages < expected_messages: if retries_left == 0: raise ValueError( "Timed out fetching messages from kafka. " f"Only {fetched_messages}/{expected_messages} fetched" ) msg = consumer.poll(timeout=0.01) if not msg: retries_left -= 1 continue error = msg.error() if error is not None: if error.fatal(): raise KafkaException(error) retries_left -= 1 continue fetched_messages += 1 topic = msg.topic() assert topic.startswith(f"{kafka_prefix}.") or topic.startswith( f"{kafka_prefix}_privileged." ), "Unexpected topic" object_type = topic[len(kafka_prefix + ".") :] consumed_messages[object_type].append( (kafka_to_key(msg.key()), kafka_to_value(msg.value())) ) return consumed_messages def assert_all_objects_consumed( consumed_messages: Dict, exclude: Optional[Collection] = None ): """Check whether all objects from TEST_OBJECTS have been consumed `exclude` can be a list of object types for which we do not want to compare the values (eg. for anonymized object). """ for object_type, known_objects in TEST_OBJECTS.items(): known_keys = [object_key(object_type, obj) for obj in known_objects] if not consumed_messages[object_type]: return (received_keys, received_values) = zip(*consumed_messages[object_type]) if object_type in ("content", "skipped_content"): for value in received_values: del value["ctime"] if object_type == "content": known_objects = [attr.evolve(o, data=None) for o in known_objects] for key in known_keys: assert key in received_keys, ( f"expected {object_type} key {pprint_key(key)} " "absent from consumed messages" ) if exclude and object_type in exclude: continue - received_objects = [ - MODEL_OBJECTS[object_type].from_dict(d) for d in received_values - ] - for value in known_objects: - assert value in received_objects, ( + expected_value = value.to_dict() + if value.object_type in ("content", "skipped_content"): + del expected_value["ctime"] + assert expected_value in received_values, ( f"expected {object_type} value {value!r} is " "absent from consumed messages" ) @pytest.fixture(scope="function") def kafka_prefix(): """Pick a random prefix for kafka topics on each call""" return "".join(random.choice(string.ascii_lowercase) for _ in range(10)) @pytest.fixture(scope="function") def kafka_consumer_group(kafka_prefix: str): """Pick a random consumer group for kafka consumers on each call""" return "test-consumer-%s" % kafka_prefix @pytest.fixture(scope="function") def object_types(): """Set of object types to precreate topics for.""" - return set(TEST_OBJECT_DICTS.keys()) + return set(TEST_OBJECTS.keys()) @pytest.fixture(scope="function") def privileged_object_types(): """Set of object types to precreate privileged topics for.""" return {"revision", "release"} @pytest.fixture(scope="function") def kafka_server( kafka_server_base: str, kafka_prefix: str, object_types: Iterator[str], privileged_object_types: Iterator[str], ) -> str: """A kafka server with existing topics Unprivileged topics are built as ``{kafka_prefix}.{object_type}`` with object_type from the ``object_types`` list. Privileged topics are built as ``{kafka_prefix}_privileged.{object_type}`` with object_type from the ``privileged_object_types`` list. """ topics = [f"{kafka_prefix}.{obj}" for obj in object_types] + [ f"{kafka_prefix}_privileged.{obj}" for obj in privileged_object_types ] # unfortunately, the Mock broker does not support the CreatTopic admin API, so we # have to create topics using a Producer. producer = Producer( { "bootstrap.servers": kafka_server_base, "client.id": "bootstrap producer", "acks": "all", } ) for topic in topics: producer.produce(topic=topic, value=None) for i in range(10): if producer.flush(0.1) == 0: break return kafka_server_base @pytest.fixture(scope="session") def kafka_server_base() -> Iterator[str]: """Create a mock kafka cluster suitable for tests. Yield a connection string. Note: this is a generator to keep the mock broker alive during the whole test session. see https://github.com/edenhill/librdkafka/blob/master/src/rdkafka_mock.h """ admin = AdminClient({"test.mock.num.brokers": "1"}) metadata = admin.list_topics() brokers = [str(broker) for broker in metadata.brokers.values()] assert len(brokers) == 1, "More than one broker found in the kafka cluster?!" broker_connstr, broker_id = brokers[0].split("/") yield broker_connstr TEST_CONFIG = { "consumer_id": "swh.journal.consumer", "stop_after_objects": 1, # will read 1 object and stop "storage": {"cls": "memory", "args": {}}, } @pytest.fixture def test_config( kafka_server_base: str, kafka_prefix: str, object_types: Iterator[str], privileged_object_types: Iterator[str], ): """Test configuration needed for producer/consumer """ return { **TEST_CONFIG, "object_types": object_types, "privileged_object_types": privileged_object_types, "brokers": [kafka_server_base], "prefix": kafka_prefix, } @pytest.fixture def consumer( kafka_server: str, test_config: Dict, kafka_consumer_group: str ) -> Consumer: """Get a connected Kafka consumer. """ consumer = Consumer( { "bootstrap.servers": kafka_server, "auto.offset.reset": "earliest", "enable.auto.commit": True, "group.id": kafka_consumer_group, } ) prefix = test_config["prefix"] kafka_topics = [ f"{prefix}.{object_type}" for object_type in test_config["object_types"] ] + [ f"{prefix}_privileged.{object_type}" for object_type in test_config["privileged_object_types"] ] consumer.subscribe(kafka_topics) yield consumer consumer.close() diff --git a/swh/journal/serializers.py b/swh/journal/serializers.py index 6f32343..399a29c 100644 --- a/swh/journal/serializers.py +++ b/swh/journal/serializers.py @@ -1,127 +1,171 @@ # Copyright (C) 2016-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Any, Dict, Union, overload import msgpack from swh.core.api.serializers import msgpack_dumps, msgpack_loads from swh.model.hashutil import DEFAULT_ALGORITHMS from swh.model.model import ( Content, Directory, + MetadataAuthority, + MetadataFetcher, Origin, OriginVisit, + OriginVisitStatus, + RawExtrinsicMetadata, Release, Revision, SkippedContent, Snapshot, ) ModelObject = Union[ - Content, Directory, Origin, OriginVisit, Release, Revision, SkippedContent, Snapshot + Content, + Directory, + MetadataAuthority, + MetadataFetcher, + Origin, + OriginVisit, + OriginVisitStatus, + RawExtrinsicMetadata, + Release, + Revision, + SkippedContent, + Snapshot, ] KeyType = Union[Dict[str, str], Dict[str, bytes], bytes] # these @overload'ed versions of the object_key method aim at helping mypy figuring # the correct type-ing. @overload def object_key( object_type: str, object_: Union[Content, Directory, Revision, Release, Snapshot] ) -> bytes: ... @overload def object_key( object_type: str, object_: Union[Origin, SkippedContent] ) -> Dict[str, bytes]: ... @overload -def object_key(object_type: str, object_: OriginVisit) -> Dict[str, str]: +def object_key( + object_type: str, + object_: Union[ + MetadataAuthority, + MetadataFetcher, + OriginVisit, + OriginVisitStatus, + RawExtrinsicMetadata, + ], +) -> Dict[str, str]: ... def object_key(object_type: str, object_) -> KeyType: if object_type in ("revision", "release", "directory", "snapshot"): return object_.id elif object_type == "content": return object_.sha1 # TODO: use a dict of hashes elif object_type == "skipped_content": return {hash: getattr(object_, hash) for hash in DEFAULT_ALGORITHMS} elif object_type == "origin": return {"url": object_.url} elif object_type == "origin_visit": return { "origin": object_.origin, "date": str(object_.date), } elif object_type == "origin_visit_status": return { "origin": object_.origin, "visit": str(object_.visit), "date": str(object_.date), } + elif object_type == "metadata_authority": + return { + "type": object_.type.value, + "url": object_.url, + } + elif object_type == "metadata_fetcher": + return { + "name": object_.name, + "version": object_.version, + } + elif object_type == "raw_extrinsic_metadata": + return { + "type": object_.type.value, + "id": str(object_.id), + "authority_type": object_.authority.type.value, + "authority_url": object_.authority.url, + "discovery_date": str(object_.discovery_date), + "fetcher_name": object_.fetcher.name, + "fetcher_version": object_.fetcher.version, + } else: raise ValueError("Unknown object type: %s." % object_type) def stringify_key_item(k: str, v: Union[str, bytes]) -> str: """Turn the item of a dict key into a string""" if isinstance(v, str): return v if k == "url": return v.decode("utf-8") return v.hex() def pprint_key(key: KeyType) -> str: """Pretty-print a kafka key""" if isinstance(key, dict): return "{%s}" % ", ".join( f"{k}: {stringify_key_item(k, v)}" for k, v in key.items() ) elif isinstance(key, bytes): return key.hex() else: return key def key_to_kafka(key: KeyType) -> bytes: """Serialize a key, possibly a dict, in a predictable way""" p = msgpack.Packer(use_bin_type=True) if isinstance(key, dict): return p.pack_map_pairs(sorted(key.items())) else: return p.pack(key) def kafka_to_key(kafka_key: bytes) -> KeyType: """Deserialize a key""" return msgpack.loads(kafka_key, raw=False) def value_to_kafka(value: Any) -> bytes: """Serialize some data for storage in kafka""" return msgpack_dumps(value) def kafka_to_value(kafka_value: bytes) -> Any: """Deserialize some data stored in kafka""" value = msgpack_loads(kafka_value) if isinstance(value, list): return tuple(value) if isinstance(value, dict): return ensure_tuples(value) return value def ensure_tuples(value: Dict) -> Dict: return {k: tuple(v) if isinstance(v, list) else v for k, v in value.items()} diff --git a/swh/journal/tests/journal_data.py b/swh/journal/tests/journal_data.py index 4aad169..1b103bf 100644 --- a/swh/journal/tests/journal_data.py +++ b/swh/journal/tests/journal_data.py @@ -1,325 +1,342 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime -from typing import Any, Dict, List, Type +from typing import Dict, Sequence -from swh.model.hashutil import MultiHash, hash_to_bytes +import attr + +from swh.model.hashutil import MultiHash, hash_to_bytes, hash_to_hex from swh.journal.serializers import ModelObject +from swh.model.identifiers import SWHID from swh.model.model import ( - BaseModel, - Content, - Directory, - Origin, - OriginVisit, - OriginVisitStatus, - Release, - Revision, - SkippedContent, - Snapshot, -) - -MODEL_CLASSES = ( Content, Directory, + DirectoryEntry, + MetadataAuthority, + MetadataAuthorityType, + MetadataFetcher, + MetadataTargetType, + ObjectType, Origin, OriginVisit, OriginVisitStatus, + Person, + RawExtrinsicMetadata, Release, Revision, + RevisionType, SkippedContent, Snapshot, + SnapshotBranch, + TargetType, + Timestamp, + TimestampWithTimezone, ) -OBJECT_TYPES: Dict[Type[BaseModel], str] = { - cls: cls.object_type for cls in MODEL_CLASSES # type: ignore -} -MODEL_OBJECTS: Dict[str, Type[BaseModel]] = { - cls.object_type: cls for cls in MODEL_CLASSES # type: ignore -} - UTC = datetime.timezone.utc CONTENTS = [ - { + Content( + length=4, + data=f"foo{i}".encode(), + status="visible", **MultiHash.from_data(f"foo{i}".encode()).digest(), - "length": 4, - "data": f"foo{i}".encode(), - "status": "visible", - } + ) for i in range(10) ] + [ - { + Content( + length=14, + data=f"forbidden foo{i}".encode(), + status="hidden", **MultiHash.from_data(f"forbidden foo{i}".encode()).digest(), - "length": 14, - "data": f"forbidden foo{i}".encode(), - "status": "hidden", - } + ) for i in range(10) ] SKIPPED_CONTENTS = [ - { + SkippedContent( + length=4, + status="absent", + reason=f"because chr({i}) != '*'", **MultiHash.from_data(f"bar{i}".encode()).digest(), - "length": 4, - "status": "absent", - "reason": f"because chr({i}) != '*'", - } + ) for i in range(2) ] -duplicate_content1 = { - "length": 4, - "sha1": hash_to_bytes("44973274ccef6ab4dfaaf86599792fa9c3fe4689"), - "sha1_git": b"another-foo", - "blake2s256": b"another-bar", - "sha256": b"another-baz", - "status": "visible", -} +duplicate_content1 = Content( + length=4, + sha1=hash_to_bytes("44973274ccef6ab4dfaaf86599792fa9c3fe4689"), + sha1_git=b"another-foo", + blake2s256=b"another-bar", + sha256=b"another-baz", + status="visible", +) # Craft a sha1 collision -duplicate_content2 = duplicate_content1.copy() -sha1_array = bytearray(duplicate_content1["sha1_git"]) +sha1_array = bytearray(duplicate_content1.sha1_git) sha1_array[0] += 1 -duplicate_content2["sha1_git"] = bytes(sha1_array) +duplicate_content2 = attr.evolve(duplicate_content1, sha1_git=bytes(sha1_array)) DUPLICATE_CONTENTS = [duplicate_content1, duplicate_content2] COMMITTERS = [ - {"fullname": b"foo", "name": b"foo", "email": b"",}, - {"fullname": b"bar", "name": b"bar", "email": b"",}, + Person(fullname=b"foo", name=b"foo", email=b""), + Person(fullname=b"bar", name=b"bar", email=b""), ] DATES = [ - { - "timestamp": {"seconds": 1234567891, "microseconds": 0,}, - "offset": 120, - "negative_utc": False, - }, - { - "timestamp": {"seconds": 1234567892, "microseconds": 0,}, - "offset": 120, - "negative_utc": False, - }, + TimestampWithTimezone( + timestamp=Timestamp(seconds=1234567891, microseconds=0,), + offset=120, + negative_utc=False, + ), + TimestampWithTimezone( + timestamp=Timestamp(seconds=1234567892, microseconds=0,), + offset=120, + negative_utc=False, + ), ] REVISIONS = [ - { - "id": hash_to_bytes("4ca486e65eb68e4986aeef8227d2db1d56ce51b3"), - "message": b"hello", - "date": DATES[0], - "committer": COMMITTERS[0], - "author": COMMITTERS[0], - "committer_date": DATES[0], - "type": "git", - "directory": b"\x01" * 20, - "synthetic": False, - "metadata": None, - "parents": (), - }, - { - "id": hash_to_bytes("677063f5c405d6fc1781fc56379c9a9adf43d3a0"), - "message": b"hello again", - "date": DATES[1], - "committer": COMMITTERS[1], - "author": COMMITTERS[1], - "committer_date": DATES[1], - "type": "hg", - "directory": b"\x02" * 20, - "synthetic": False, - "metadata": None, - "parents": (), - }, + Revision( + id=hash_to_bytes("4ca486e65eb68e4986aeef8227d2db1d56ce51b3"), + message=b"hello", + date=DATES[0], + committer=COMMITTERS[0], + author=COMMITTERS[0], + committer_date=DATES[0], + type=RevisionType.GIT, + directory=b"\x01" * 20, + synthetic=False, + metadata=None, + parents=(), + ), + Revision( + id=hash_to_bytes("677063f5c405d6fc1781fc56379c9a9adf43d3a0"), + message=b"hello again", + date=DATES[1], + committer=COMMITTERS[1], + author=COMMITTERS[1], + committer_date=DATES[1], + type=RevisionType.MERCURIAL, + directory=b"\x02" * 20, + synthetic=False, + metadata=None, + parents=(), + ), ] RELEASES = [ - { - "id": hash_to_bytes("8059dc4e17fcd0e51ca3bcd6b80f4577d281fd08"), - "name": b"v0.0.1", - "date": { - "timestamp": {"seconds": 1234567890, "microseconds": 0,}, - "offset": 120, - "negative_utc": False, - }, - "author": COMMITTERS[0], - "target_type": "revision", - "target": b"\x04" * 20, - "message": b"foo", - "synthetic": False, - }, + Release( + id=hash_to_bytes("8059dc4e17fcd0e51ca3bcd6b80f4577d281fd08"), + name=b"v0.0.1", + date=TimestampWithTimezone( + timestamp=Timestamp(seconds=1234567890, microseconds=0,), + offset=120, + negative_utc=False, + ), + author=COMMITTERS[0], + target_type=ObjectType.REVISION, + target=b"\x04" * 20, + message=b"foo", + synthetic=False, + ), ] ORIGINS = [ - {"url": "https://somewhere.org/den/fox",}, - {"url": "https://overtherainbow.org/fox/den",}, + Origin(url="https://somewhere.org/den/fox",), + Origin(url="https://overtherainbow.org/fox/den",), ] ORIGIN_VISITS = [ - { - "origin": ORIGINS[0]["url"], - "date": datetime.datetime(2013, 5, 7, 4, 20, 39, 369271, tzinfo=UTC), - "visit": 1, - "type": "git", - }, - { - "origin": ORIGINS[1]["url"], - "date": datetime.datetime(2014, 11, 27, 17, 20, 39, tzinfo=UTC), - "visit": 1, - "type": "hg", - }, - { - "origin": ORIGINS[0]["url"], - "date": datetime.datetime(2018, 11, 27, 17, 20, 39, tzinfo=UTC), - "visit": 2, - "type": "git", - }, - { - "origin": ORIGINS[0]["url"], - "date": datetime.datetime(2018, 11, 27, 17, 20, 39, tzinfo=UTC), - "visit": 3, - "type": "git", - }, - { - "origin": ORIGINS[1]["url"], - "date": datetime.datetime(2015, 11, 27, 17, 20, 39, tzinfo=UTC), - "visit": 2, - "type": "hg", - }, + OriginVisit( + origin=ORIGINS[0].url, + date=datetime.datetime(2013, 5, 7, 4, 20, 39, 369271, tzinfo=UTC), + visit=1, + type="git", + ), + OriginVisit( + origin=ORIGINS[1].url, + date=datetime.datetime(2014, 11, 27, 17, 20, 39, tzinfo=UTC), + visit=1, + type="hg", + ), + OriginVisit( + origin=ORIGINS[0].url, + date=datetime.datetime(2018, 11, 27, 17, 20, 39, tzinfo=UTC), + visit=2, + type="git", + ), + OriginVisit( + origin=ORIGINS[0].url, + date=datetime.datetime(2018, 11, 27, 17, 20, 39, tzinfo=UTC), + visit=3, + type="git", + ), + OriginVisit( + origin=ORIGINS[1].url, + date=datetime.datetime(2015, 11, 27, 17, 20, 39, tzinfo=UTC), + visit=2, + type="hg", + ), ] # The origin-visit-status dates needs to be shifted slightly in the future from their # visit dates counterpart. Otherwise, we are hitting storage-wise the "on conflict" # ignore policy (because origin-visit-add creates an origin-visit-status with the same # parameters from the origin-visit {origin, visit, date}... ORIGIN_VISIT_STATUSES = [ - { - "origin": ORIGINS[0]["url"], - "date": datetime.datetime(2013, 5, 7, 4, 20, 39, 432222, tzinfo=UTC), - "visit": 1, - "status": "ongoing", - "snapshot": None, - "metadata": None, - }, - { - "origin": ORIGINS[1]["url"], - "date": datetime.datetime(2014, 11, 27, 17, 21, 12, tzinfo=UTC), - "visit": 1, - "status": "ongoing", - "snapshot": None, - "metadata": None, - }, - { - "origin": ORIGINS[0]["url"], - "date": datetime.datetime(2018, 11, 27, 17, 20, 59, tzinfo=UTC), - "visit": 2, - "status": "ongoing", - "snapshot": None, - "metadata": None, - }, - { - "origin": ORIGINS[0]["url"], - "date": datetime.datetime(2018, 11, 27, 17, 20, 49, tzinfo=UTC), - "visit": 3, - "status": "full", - "snapshot": hash_to_bytes("17d0066a4a80aba4a0e913532ee8ff2014f006a9"), - "metadata": None, - }, - { - "origin": ORIGINS[1]["url"], - "date": datetime.datetime(2015, 11, 27, 17, 22, 18, tzinfo=UTC), - "visit": 2, - "status": "partial", - "snapshot": hash_to_bytes("8ce268b87faf03850693673c3eb5c9bb66e1ca38"), - "metadata": None, - }, + OriginVisitStatus( + origin=ORIGINS[0].url, + date=datetime.datetime(2013, 5, 7, 4, 20, 39, 432222, tzinfo=UTC), + visit=1, + status="ongoing", + snapshot=None, + metadata=None, + ), + OriginVisitStatus( + origin=ORIGINS[1].url, + date=datetime.datetime(2014, 11, 27, 17, 21, 12, tzinfo=UTC), + visit=1, + status="ongoing", + snapshot=None, + metadata=None, + ), + OriginVisitStatus( + origin=ORIGINS[0].url, + date=datetime.datetime(2018, 11, 27, 17, 20, 59, tzinfo=UTC), + visit=2, + status="ongoing", + snapshot=None, + metadata=None, + ), + OriginVisitStatus( + origin=ORIGINS[0].url, + date=datetime.datetime(2018, 11, 27, 17, 20, 49, tzinfo=UTC), + visit=3, + status="full", + snapshot=hash_to_bytes("17d0066a4a80aba4a0e913532ee8ff2014f006a9"), + metadata=None, + ), + OriginVisitStatus( + origin=ORIGINS[1].url, + date=datetime.datetime(2015, 11, 27, 17, 22, 18, tzinfo=UTC), + visit=2, + status="partial", + snapshot=hash_to_bytes("8ce268b87faf03850693673c3eb5c9bb66e1ca38"), + metadata=None, + ), ] DIRECTORIES = [ - {"id": hash_to_bytes("4b825dc642cb6eb9a060e54bf8d69288fbee4904"), "entries": ()}, - { - "id": hash_to_bytes("21416d920e0ebf0df4a7888bed432873ed5cb3a7"), - "entries": ( - { - "name": b"file1.ext", - "perms": 0o644, - "type": "file", - "target": CONTENTS[0]["sha1_git"], - }, - { - "name": b"dir1", - "perms": 0o755, - "type": "dir", - "target": hash_to_bytes("4b825dc642cb6eb9a060e54bf8d69288fbee4904"), - }, - { - "name": b"subprepo1", - "perms": 0o160000, - "type": "rev", - "target": REVISIONS[1]["id"], - }, + Directory(id=hash_to_bytes("4b825dc642cb6eb9a060e54bf8d69288fbee4904"), entries=()), + Directory( + id=hash_to_bytes("21416d920e0ebf0df4a7888bed432873ed5cb3a7"), + entries=( + DirectoryEntry( + name=b"file1.ext", + perms=0o644, + type="file", + target=CONTENTS[0].sha1_git, + ), + DirectoryEntry( + name=b"dir1", + perms=0o755, + type="dir", + target=hash_to_bytes("4b825dc642cb6eb9a060e54bf8d69288fbee4904"), + ), + DirectoryEntry( + name=b"subprepo1", perms=0o160000, type="rev", target=REVISIONS[1].id, + ), ), - }, + ), ] SNAPSHOTS = [ - { - "id": hash_to_bytes("17d0066a4a80aba4a0e913532ee8ff2014f006a9"), - "branches": { - b"master": {"target_type": "revision", "target": REVISIONS[0]["id"]} + Snapshot( + id=hash_to_bytes("17d0066a4a80aba4a0e913532ee8ff2014f006a9"), + branches={ + b"master": SnapshotBranch( + target_type=TargetType.REVISION, target=REVISIONS[0].id + ) }, - }, - { - "id": hash_to_bytes("8ce268b87faf03850693673c3eb5c9bb66e1ca38"), - "branches": { - b"target/revision": { - "target_type": "revision", - "target": REVISIONS[0]["id"], - }, - b"target/alias": {"target_type": "alias", "target": b"target/revision"}, - b"target/directory": { - "target_type": "directory", - "target": DIRECTORIES[0]["id"], - }, - b"target/release": {"target_type": "release", "target": RELEASES[0]["id"]}, - b"target/snapshot": { - "target_type": "snapshot", - "target": hash_to_bytes("17d0066a4a80aba4a0e913532ee8ff2014f006a9"), - }, + ), + Snapshot( + id=hash_to_bytes("8ce268b87faf03850693673c3eb5c9bb66e1ca38"), + branches={ + b"target/revision": SnapshotBranch( + target_type=TargetType.REVISION, target=REVISIONS[0].id, + ), + b"target/alias": SnapshotBranch( + target_type=TargetType.ALIAS, target=b"target/revision" + ), + b"target/directory": SnapshotBranch( + target_type=TargetType.DIRECTORY, target=DIRECTORIES[0].id, + ), + b"target/release": SnapshotBranch( + target_type=TargetType.RELEASE, target=RELEASES[0].id + ), + b"target/snapshot": SnapshotBranch( + target_type=TargetType.SNAPSHOT, + target=hash_to_bytes("17d0066a4a80aba4a0e913532ee8ff2014f006a9"), + ), }, - }, + ), +] + + +METADATA_AUTHORITIES = [ + MetadataAuthority( + type=MetadataAuthorityType.FORGE, url="http://example.org/", metadata={}, + ), ] +METADATA_FETCHERS = [ + MetadataFetcher(name="test-fetcher", version="1.0.0", metadata={},) +] -TEST_OBJECT_DICTS: Dict[str, List[Dict[str, Any]]] = { +RAW_EXTRINSIC_METADATA = [ + RawExtrinsicMetadata( + type=MetadataTargetType.ORIGIN, + id="http://example.org/foo.git", + discovery_date=datetime.datetime(2020, 7, 30, 17, 8, 20, tzinfo=UTC), + authority=attr.evolve(METADATA_AUTHORITIES[0], metadata=None), + fetcher=attr.evolve(METADATA_FETCHERS[0], metadata=None), + format="json", + metadata=b'{"foo": "bar"}', + ), + RawExtrinsicMetadata( + type=MetadataTargetType.CONTENT, + id=SWHID(object_type="content", object_id=hash_to_hex(CONTENTS[0].sha1_git)), + discovery_date=datetime.datetime(2020, 7, 30, 17, 8, 20, tzinfo=UTC), + authority=attr.evolve(METADATA_AUTHORITIES[0], metadata=None), + fetcher=attr.evolve(METADATA_FETCHERS[0], metadata=None), + format="json", + metadata=b'{"foo": "bar"}', + ), +] + + +TEST_OBJECTS: Dict[str, Sequence[ModelObject]] = { "content": CONTENTS, "directory": DIRECTORIES, + "metadata_authority": METADATA_AUTHORITIES, + "metadata_fetcher": METADATA_FETCHERS, "origin": ORIGINS, "origin_visit": ORIGIN_VISITS, "origin_visit_status": ORIGIN_VISIT_STATUSES, + "raw_extrinsic_metadata": RAW_EXTRINSIC_METADATA, "release": RELEASES, "revision": REVISIONS, "snapshot": SNAPSHOTS, "skipped_content": SKIPPED_CONTENTS, } - -TEST_OBJECTS: Dict[str, List[ModelObject]] = {} - -for object_type, objects in TEST_OBJECT_DICTS.items(): - converted_objects: List[ModelObject] = [] - model = MODEL_OBJECTS[object_type] - - for (num, obj_d) in enumerate(objects): - if object_type == "content": - obj_d = {**obj_d, "ctime": datetime.datetime.now(tz=UTC)} - - converted_objects.append(model.from_dict(obj_d)) - - TEST_OBJECTS[object_type] = converted_objects diff --git a/swh/journal/tests/test_pytest_plugin.py b/swh/journal/tests/test_pytest_plugin.py index 68d26f0..14e49d7 100644 --- a/swh/journal/tests/test_pytest_plugin.py +++ b/swh/journal/tests/test_pytest_plugin.py @@ -1,68 +1,71 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Iterator from confluent_kafka.admin import AdminClient def test_kafka_server(kafka_server_base: str): ip, port_str = kafka_server_base.split(":") assert ip == "127.0.0.1" assert int(port_str) admin = AdminClient({"bootstrap.servers": kafka_server_base}) topics = admin.list_topics() assert len(topics.brokers) == 1 def test_kafka_server_with_topics( kafka_server: str, kafka_prefix: str, object_types: Iterator[str], privileged_object_types: Iterator[str], ): admin = AdminClient({"bootstrap.servers": kafka_server}) # check unprivileged topics are present topics = { topic for topic in admin.list_topics().topics if topic.startswith(f"{kafka_prefix}.") } assert topics == {f"{kafka_prefix}.{obj}" for obj in object_types} # check privileged topics are present topics = { topic for topic in admin.list_topics().topics if topic.startswith(f"{kafka_prefix}_privileged.") } assert topics == { f"{kafka_prefix}_privileged.{obj}" for obj in privileged_object_types } def test_test_config(test_config: dict, kafka_prefix: str, kafka_server_base: str): assert test_config == { "consumer_id": "swh.journal.consumer", "stop_after_objects": 1, "storage": {"cls": "memory", "args": {}}, "object_types": { "content", "directory", + "metadata_authority", + "metadata_fetcher", "origin", "origin_visit", "origin_visit_status", + "raw_extrinsic_metadata", "release", "revision", "snapshot", "skipped_content", }, "privileged_object_types": {"release", "revision",}, "brokers": [kafka_server_base], "prefix": kafka_prefix, } diff --git a/version.txt b/version.txt index d0c9d07..d94ba98 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -v0.4.0-0-gbf35ea7 \ No newline at end of file +v0.4.1-0-g94f282f \ No newline at end of file