diff --git a/PKG-INFO b/PKG-INFO index 209f929..108b45f 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,76 +1,76 @@ Metadata-Version: 2.1 Name: swh.journal -Version: 0.9.1 +Version: 1.0.0 Summary: Software Heritage Journal utilities Home-page: https://forge.softwareheritage.org/diffusion/DJNL/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh-journal Project-URL: Documentation, https://docs.softwareheritage.org/devel/swh-journal/ Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Requires-Python: >=3.7 Description-Content-Type: text/markdown Provides-Extra: testing License-File: LICENSE License-File: AUTHORS swh-journal =========== Persistent logger of changes to the archive, with publish-subscribe support. See the [documentation](https://docs.softwareheritage.org/devel/swh-journal/index.html#software-heritage-journal) for more details. # Local test As a pre-requisite, you need a kakfa installation path. The following target will take care of this: ``` make install ``` Then, provided you are in the right virtual environment as described in the [swh getting-started](https://docs.softwareheritage.org/devel/developer-setup.html#developer-setup): ``` pytest ``` or: ``` tox ``` # Running ## publisher Command: ``` $ swh-journal --config-file ~/.config/swh/journal/publisher.yml \ publisher ``` # Auto-completion To have the completion, add the following in your ~/.virtualenvs/swh/bin/postactivate: ``` eval "$(_SWH_JOURNAL_COMPLETE=$autocomplete_cmd swh-journal)" ``` diff --git a/debian/changelog b/debian/changelog index 697c92e..1bcf85f 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,530 +1,535 @@ -swh-journal (0.9.1-1~swh1~bpo10+1) buster-swh; urgency=medium +swh-journal (1.0.0-1~swh1) unstable-swh; urgency=medium - * Rebuild for buster-swh + * New upstream release 1.0.0 - (tagged by David Douard + on 2022-01-21 11:44:07 +0100) + * Upstream changes: - v1.0.0 - remove 'process_timeout' from + JournalClient arguments - add statsd metrics in the journal + client - add support for generic config of the rdkafka's + stats_cb callback - -- Software Heritage autobuilder (on jenkins-debian1) Tue, 16 Nov 2021 15:41:59 +0000 + -- Software Heritage autobuilder (on jenkins-debian1) Fri, 21 Jan 2022 10:57:09 +0000 swh-journal (0.9.1-1~swh1) unstable-swh; urgency=medium * New upstream release 0.9.1 - (tagged by David Douard on 2021-11-16 16:28:40 +0100) * Upstream changes: - v0.9.1 - fix flaky tests -- Software Heritage autobuilder (on jenkins-debian1) Tue, 16 Nov 2021 15:38:07 +0000 swh-journal (0.9.0-2~swh1) unstable-swh; urgency=medium * No need to explicitly call the pytest plugin, it's registered via setup.py -- Nicolas Dandrimont Tue, 16 Nov 2021 11:09:15 +0100 swh-journal (0.9.0-1~swh1) unstable-swh; urgency=medium * New upstream release 0.9.0 - (tagged by David Douard on 2021-10-28 18:37:28 +0200) * Upstream changes: - v0.9.0 -- Software Heritage autobuilder (on jenkins-debian1) Thu, 28 Oct 2021 16:43:54 +0000 swh-journal (0.8.0-1~swh1) unstable-swh; urgency=medium * New upstream release 0.8.0 - (tagged by David Douard on 2021-06-18 11:23:06 +0200) * Upstream changes: - v0.8.0 -- Software Heritage autobuilder (on jenkins-debian1) Fri, 18 Jun 2021 10:17:01 +0000 swh-journal (0.7.1-1~swh1) unstable-swh; urgency=medium * New upstream release 0.7.1 - (tagged by Antoine Lambert on 2021-02-09 15:41:45 +0100) * Upstream changes: - version 0.7.1 -- Software Heritage autobuilder (on jenkins-debian1) Tue, 09 Feb 2021 14:46:12 +0000 swh-journal (0.7.0-1~swh1) unstable-swh; urgency=medium * New upstream release 0.7.0 - (tagged by David Douard on 2021-01-27 15:38:49 +0100) * Upstream changes: - v0.7.0 -- Software Heritage autobuilder (on jenkins-debian1) Wed, 27 Jan 2021 14:42:51 +0000 swh-journal (0.6.2-1~swh1) unstable-swh; urgency=medium * New upstream release 0.6.2 - (tagged by Vincent SELLIER on 2021-01-14 14:28:31 +0100) * Upstream changes: - v0.6.2 - * 2021-01-13 Add new field OriginVisitStatus.type field on test data -- Software Heritage autobuilder (on jenkins-debian1) Thu, 14 Jan 2021 13:31:15 +0000 swh-journal (0.6.1-1~swh1) unstable-swh; urgency=medium * New upstream release 0.6.1 - (tagged by Valentin Lorentz on 2020-12-21 11:29:15 +0100) * Upstream changes: - v0.6.1 - * serializers: Deserialize as lists instead of tuples - * Fix dependency on msgpack (>= 1.0.0) - * Blacklist msgpack 1.0.1. -- Software Heritage autobuilder (on jenkins-debian1) Mon, 21 Dec 2020 10:31:53 +0000 swh-journal (0.6.0-1~swh1) unstable-swh; urgency=medium * New upstream release 0.6.0 - (tagged by David Douard on 2020-12-08 15:27:18 +0100) * Upstream changes: - v0.6.0 -- Software Heritage autobuilder (on jenkins-debian1) Tue, 08 Dec 2020 14:31:08 +0000 swh-journal (0.5.1-1~swh1) unstable-swh; urgency=medium * New upstream release 0.5.1 - (tagged by Valentin Lorentz on 2020-11-05 15:17:23 +0100) * Upstream changes: - v0.5.1 - This release makes the JournalWriter usable by swh-indexer-storage: - * Make value_sanitizer an argument of JournalWriter. - * Make the type of values of JournalWriter generic, so it works with types not from swh-model. -- Software Heritage autobuilder (on jenkins-debian1) Thu, 05 Nov 2020 14:20:17 +0000 swh-journal (0.5.0-1~swh1) unstable-swh; urgency=medium * New upstream release 0.5.0 - (tagged by Nicolas Dandrimont on 2020-10-27 16:01:22 +0100) * Upstream changes: - Release swh.journal v0.5.0 - Use the model-provided unique_key as deduplication key - Drop long- deprecated swh.journal.cli module - Use RawExtrinsicMetadata.target attribute instead of .id - CI changes: pin black version, use upstream flake8 hook -- Software Heritage autobuilder (on jenkins-debian1) Tue, 27 Oct 2020 15:05:44 +0000 swh-journal (0.4.3-1~swh1) unstable-swh; urgency=medium * New upstream release 0.4.3 - (tagged by David Douard on 2020-09-25 11:51:00 +0200) * Upstream changes: - v0.4.3 -- Software Heritage autobuilder (on jenkins-debian1) Fri, 25 Sep 2020 09:53:37 +0000 swh-journal (0.4.2-1~swh2) unstable-swh; urgency=medium * Fix debian dependencies -- Antoine R. Dumont (@ardumont)) Fri, 07 Aug 2020 09:38:25 +0000 swh-journal (0.4.2-1~swh1) unstable-swh; urgency=medium * New upstream release 0.4.2 - (tagged by Antoine R. Dumont (@ardumont) on 2020-08-07 10:58:45 +0200) * Upstream changes: - v0.4.2 - pytest_plugin: Deal with the case when ctime is dropped - setup.py: Migrate from vcversioner to setuptools-scm -- Software Heritage autobuilder (on jenkins-debian1) Fri, 07 Aug 2020 09:01:25 +0000 swh-journal (0.4.1-1~swh1) unstable-swh; urgency=medium * New upstream release 0.4.1 - (tagged by Valentin Lorentz on 2020-07-31 11:25:19 +0200) * Upstream changes: - v0.4.1 - * Remove TEST_OBJECT_DICTS, use only TEST_OBJECTS. - * Add support for MetadataAuthority, MetadataFetcher, and RawExtrinsicMetadata. -- Software Heritage autobuilder (on jenkins-debian1) Fri, 31 Jul 2020 09:31:32 +0000 swh-journal (0.4.0-1~swh1) unstable-swh; urgency=medium * New upstream release 0.4.0 - (tagged by David Douard on 2020-07-06 13:49:40 +0200) * Upstream changes: - v0.4.0 -- Software Heritage autobuilder (on jenkins-debian1) Mon, 06 Jul 2020 11:58:06 +0000 swh-journal (0.3.5-1~swh1) unstable-swh; urgency=medium * New upstream release 0.3.5 - (tagged by Antoine R. Dumont (@ardumont) on 2020-07-01 15:55:31 +0200) * Upstream changes: - v0.3.5 - journal_data: Drop obsolete origin_visit fields - Use proper hash ids in tests' journal_data -- Software Heritage autobuilder (on jenkins-debian1) Wed, 01 Jul 2020 14:00:27 +0000 swh-journal (0.3.4-1~swh1) unstable-swh; urgency=medium * New upstream release 0.3.4 - (tagged by Antoine R. Dumont (@ardumont) on 2020-06-25 10:10:16 +0200) * Upstream changes: - v0.3.4 - Drop datetime conversion indirection -- Software Heritage autobuilder (on jenkins-debian1) Thu, 25 Jun 2020 08:13:02 +0000 swh-journal (0.3.3-1~swh1) unstable-swh; urgency=medium * New upstream release 0.3.3 - (tagged by Antoine R. Dumont (@ardumont) on 2020-06-25 09:34:32 +0200) * Upstream changes: - v0.3.3 - journal_data: Make origin-visit optional fields to None -- Software Heritage autobuilder (on jenkins-debian1) Thu, 25 Jun 2020 07:36:28 +0000 swh-journal (0.3.2-1~swh1) unstable-swh; urgency=medium * New upstream release 0.3.2 - (tagged by David Douard on 2020-06-17 09:29:44 +0200) * Upstream changes: - v0.3.2 -- Software Heritage autobuilder (on jenkins-debian1) Wed, 17 Jun 2020 07:37:10 +0000 swh-journal (0.3.1-1~swh1) unstable-swh; urgency=medium * New upstream release 0.3.1 - (tagged by Antoine R. Dumont (@ardumont) on 2020-06-10 10:54:44 +0200) * Upstream changes: - v0.3.1 - pytest_plugin: pprint key when assertion is not respected -- Software Heritage autobuilder (on jenkins-debian1) Wed, 10 Jun 2020 08:56:57 +0000 swh-journal (0.3.0-1~swh1) unstable-swh; urgency=medium * New upstream release 0.3.0 - (tagged by Antoine R. Dumont (@ardumont) on 2020-06-09 10:42:30 +0200) * Upstream changes: - v0.3.0 - Allow journal to deal with origin_visit_status - test: Use origin-visit date field as datetime to phase out iso8601 str -- Software Heritage autobuilder (on jenkins-debian1) Tue, 09 Jun 2020 08:44:44 +0000 swh-journal (0.2.0-1~swh1) unstable-swh; urgency=medium * New upstream release 0.2.0 - (tagged by David Douard on 2020-06-03 13:50:54 +0200) * Upstream changes: - v0.2.0 -- Software Heritage autobuilder (on jenkins-debian1) Wed, 03 Jun 2020 12:00:31 +0000 swh-journal (0.1.0-1~swh1) unstable-swh; urgency=medium * New upstream release 0.1.0 - (tagged by David Douard on 2020-05-07 10:33:18 +0200) * Upstream changes: - v0.1.0 -- Software Heritage autobuilder (on jenkins-debian1) Thu, 07 May 2020 08:35:49 +0000 swh-journal (0.0.32-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.32 - (tagged by Antoine R. Dumont (@ardumont) on 2020-05-04 18:02:45 +0200) * Upstream changes: - v0.0.32 - serializers: Make kafka_to_key implem compatible with stable version - setup.py: add documentation link - Remove the content replayer code - Remove the backfiller and the (storage) replayer -- Software Heritage autobuilder (on jenkins-debian1) Mon, 04 May 2020 16:04:38 +0000 swh-journal (0.0.31-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.31 - (tagged by David Douard on 2020-04-23 12:27:47 +0200) * Upstream changes: - v0.0.31 -- Software Heritage autobuilder (on jenkins-debian1) Thu, 23 Apr 2020 10:37:28 +0000 swh-journal (0.0.30-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.30 - (tagged by Nicolas Dandrimont on 2020-04-14 15:57:34 +0200) * Upstream changes: - Release swh.journal v0.0.30 - various test refactorings - enable black on the source code - accept swh.model objects in the journal writers - listen to delivery notifications after writing messages - default to message.max.bytes = 100 MB in the kafka writer -- Software Heritage autobuilder (on jenkins-debian1) Tue, 14 Apr 2020 14:10:54 +0000 swh-journal (0.0.29-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.29 - (tagged by Antoine R. Dumont (@ardumont) on 2020-03-27 10:40:46 +0100) * Upstream changes: - v0.0.29 - replayer: Allow legacy origin to be replayed - tests.utils: Test from modules using this need to call close method - tests: Adapt model according to latest change -- Software Heritage autobuilder (on jenkins-debian1) Fri, 27 Mar 2020 09:51:08 +0000 swh-journal (0.0.28-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.28 - (tagged by Antoine R. Dumont (@ardumont) on 2020-03-25 10:14:47 +0100) * Upstream changes: - v0.0.28 - journal.replay: Migrate to latest HashCollision change - replayer: factor out legacy objects fixers - journal.replay: Inline `_fix_origin_visit` for loop in insert_object - journal.replay: Align _fix_content with other fix methods - journal.replay: Align fix revision behavior to other fix methods - Remove extra 'perms' key of contents when replaying. -- Software Heritage autobuilder (on jenkins-debian1) Wed, 25 Mar 2020 09:23:55 +0000 swh-journal (0.0.27-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.27 - (tagged by Antoine R. Dumont (@ardumont) on 2020-03-16 16:09:33 +0100) * Upstream changes: - v0.0.27 - Migrate to latest origin_visit_upsert/add api changes - journal: Use swh-model objects instead of dicts in replay and writer - replay: Filter out colliding contents when replaying - Use better kafka producer semantics in the journal writers - Make the number of messages processed at a time by journal clients - configurable - Drop deprecated cli options - Replace deprecated options with a config file override in cli tests - Clean up the signature of test_cli's invoke method - Migrate test cli config to a dict instead of raw yaml - kafka: normalize KafkaJournalWriter.write_addition[s] API - Rename JournalClient.max_messages to JournalClient.stop_after_objects - Be more careful with content generation in test_write_replay - Add type annotations to swh.journal.client arguments - Unify tense for content replay statistics log entry - Add missing log4j.properties file from MANIFEST.in - Unify retry/error handling for content replay -- Software Heritage autobuilder (on jenkins-debian1) Mon, 16 Mar 2020 15:17:43 +0000 swh-journal (0.0.26-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.26 - (tagged by David Douard on 2020-03-06 15:36:23 +0100) * Upstream changes: - v0.0.26 -- Software Heritage autobuilder (on jenkins-debian1) Fri, 06 Mar 2020 14:47:18 +0000 swh-journal (0.0.25-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.25 - (tagged by Valentin Lorentz on 2020-01-30 15:35:41 +0100) * Upstream changes: - v0.0.25 - * Add support for swh-storage v0.0.168. - * Accept None dates when validating revisions. -- Software Heritage autobuilder (on jenkins-debian1) Thu, 30 Jan 2020 14:43:25 +0000 swh-journal (0.0.24-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.24 - (tagged by Antoine Lambert on 2020-01-06 16:14:32 +0100) * Upstream changes: - version 0.0.24 -- Software Heritage autobuilder (on jenkins-debian1) Mon, 06 Jan 2020 15:20:54 +0000 swh-journal (0.0.23-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.23 - (tagged by Nicolas Dandrimont on 2020-01-03 20:02:39 +0100) * Upstream changes: - Release swh.journal v0.0.23 - support short-hand syntax for initializing the journal direct writer -- Software Heritage autobuilder (on jenkins-debian1) Fri, 03 Jan 2020 19:06:31 +0000 swh-journal (0.0.21-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.21 - (tagged by David Douard on 2019-11-29 15:37:49 +0100) * Upstream changes: - v0.0.21 -- Software Heritage autobuilder (on jenkins-debian1) Mon, 02 Dec 2019 14:12:49 +0000 swh-journal (0.0.20-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.20 - (tagged by David Douard on 2019-11-29 12:06:13 +0100) * Upstream changes: - v0.0.20 -- Software Heritage autobuilder (on jenkins-debian1) Fri, 29 Nov 2019 11:10:44 +0000 swh-journal (0.0.19-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.19 - (tagged by Nicolas Dandrimont on 2019-11-07 14:33:58 +0100) * Upstream changes: - Release swh.journal v0.0.19 - Merge several reliability fixes -- Software Heritage autobuilder (on jenkins-debian1) Thu, 07 Nov 2019 13:37:23 +0000 swh-journal (0.0.18-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.18 - (tagged by Stefano Zacchiroli on 2019-10-01 10:19:26 +0200) * Upstream changes: - v0.0.18 - * tox: anticipate mypy run to just after flake8 - * init.py: switch to documented way of extending path - * typing: minimal changes to make a no-op mypy run pass - * writer: Normalize 'cls' value to 'memory' for in- memory instantiation - * Add a test directly for the journal client. -- Software Heritage autobuilder (on jenkins-debian1) Tue, 01 Oct 2019 11:12:03 +0000 swh-journal (0.0.17-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.17 - (tagged by Nicolas Dandrimont on 2019-09-18 18:01:50 +0200) * Upstream changes: - Release swh.journal v0.0.17 - Cleanup fallout from confluent_kafka migration - Better error handling and logging in direct_writer - Backfiller fixups - More extensive mock for KafkaConsumer -- Software Heritage autobuilder (on jenkins-debian1) Wed, 18 Sep 2019 16:08:20 +0000 swh-journal (0.0.16-1~swh2) unstable-swh; urgency=medium * Migrate to confluent-kafka -- Nicolas Dandrimont Fri, 13 Sep 2019 20:11:24 +0200 swh-journal (0.0.16-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.16 - (tagged by Nicolas Dandrimont on 2019-09-13 14:28:50 +0200) * Upstream changes: - Release swh.journal v0.0.16: - Migrate to confluent-kafka from python-kafka -- Software Heritage autobuilder (on jenkins-debian1) Fri, 13 Sep 2019 12:34:35 +0000 swh-journal (0.0.15-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.15 - (tagged by David Douard on 2019-09-10 16:50:42 +0200) * Upstream changes: - v0.0.15 -- Software Heritage autobuilder (on jenkins-debian1) Tue, 10 Sep 2019 14:53:55 +0000 swh-journal (0.0.14-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.14 - (tagged by David Douard on 2019-07-18 13:38:39 +0200) * Upstream changes: - 0.0.14 - Code of conduct - fix the backfiller - fix compatibility with click < 7 - make the replayer robust against old formats -- Software Heritage autobuilder (on jenkins-debian1) Thu, 18 Jul 2019 11:44:40 +0000 swh-journal (0.0.13-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.13 - (tagged by Antoine R. Dumont (@ardumont) on 2019-07-03 10:26:29 +0200) * Upstream changes: - v0.0.13 - cli: Document depreated options -- Software Heritage autobuilder (on jenkins-debian1) Wed, 03 Jul 2019 08:33:57 +0000 swh-journal (0.0.12-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.12 - (tagged by Valentin Lorentz on 2019-07-02 11:58:00 +0200) * Upstream changes: - v0.0.12 - More CLI option - Replay parallelism - Fix build on Debian 9 -- Software Heritage autobuilder (on jenkins-debian1) Tue, 02 Jul 2019 10:08:07 +0000 swh-journal (0.0.11-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.11 - (tagged by David Douard on 2019-06-12 13:58:14 +0200) * Upstream changes: - v0.0.11 -- Software Heritage autobuilder (on jenkins-debian1) Wed, 12 Jun 2019 12:10:52 +0000 swh-journal (0.0.10-1~swh2) unstable-swh; urgency=medium * Disable tests at build-time -- Nicolas Dandrimont Thu, 09 May 2019 14:42:24 +0200 swh-journal (0.0.10-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.10 - (tagged by Nicolas Dandrimont on 2019-05-09 14:29:52 +0200) * Upstream changes: - Release swh.journal v0.0.10 - Remove the publisher component, introduce the backfiller component. -- Software Heritage autobuilder (on jenkins-debian1) Thu, 09 May 2019 12:34:36 +0000 swh-journal (0.0.9-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.9 - (tagged by David Douard on 2019-04-10 13:42:32 +0200) * Upstream changes: - v0.0.9 -- Software Heritage autobuilder (on jenkins-debian1) Wed, 10 Apr 2019 11:48:39 +0000 swh-journal (0.0.8-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.8 - (tagged by Antoine R. Dumont (@ardumont) on 2019-03-15 13:56:57 +0100) * Upstream changes: - v0.0.8 - Add swh-journal cli -- Software Heritage autobuilder (on jenkins-debian1) Fri, 15 Mar 2019 13:00:11 +0000 swh-journal (0.0.7-1~swh2) unstable-swh; urgency=low * New release fixing build dependencies -- Antoine Romain Dumont (@ardumont) Tue, 19 Feb 2019 14:18:06 +0100 swh-journal (0.0.7-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.7 - (tagged by Antoine R. Dumont (@ardumont) on 2019-01-11 11:53:44 +0100) * Upstream changes: - v0.0.7 - Fix off-by-one error when checking max_messages. - tests: Adapt tests according to latest in-memory storage changes -- Software Heritage autobuilder (on jenkins-debian1) Fri, 11 Jan 2019 10:56:46 +0000 swh-journal (0.0.4-1~swh1) unstable-swh; urgency=medium * Release swh.journal version 0.0.4 * Update packaging runes -- Nicolas Dandrimont Thu, 12 Oct 2017 19:01:53 +0200 swh-journal (0.0.3-1~swh1) unstable-swh; urgency=medium * Release swh.journal v0.0.3 * Prepare building for stretch -- Nicolas Dandrimont Fri, 30 Jun 2017 17:29:15 +0200 swh-journal (0.0.2-1~swh1) unstable-swh; urgency=medium * v0.0.2 * Adapt swh.journal.publisher * Adapt swh.journal.client * Add swh.journal.checker basic implementation (reads and sends all * objects to publisher's subscribed queues). -- Antoine R. Dumont (@ardumont) Fri, 24 Mar 2017 12:54:16 +0100 swh-journal (0.0.1-1~swh1) unstable-swh; urgency=medium * Initial release * v0.0.1 * Add a journal publisher * Add a base class interface for journal clients -- Antoine R. Dumont (@ardumont) Tue, 21 Mar 2017 14:38:13 +0100 diff --git a/requirements-swh.txt b/requirements-swh.txt index aa7bbf8..49b0699 100644 --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1 +1,2 @@ +swh.core >= 1.1 swh.model >= 0.12.0 diff --git a/requirements-test.txt b/requirements-test.txt index 9b9e5f9..2eeea7c 100644 --- a/requirements-test.txt +++ b/requirements-test.txt @@ -1,2 +1,3 @@ pytest hypothesis +swh.core[testing] diff --git a/swh.journal.egg-info/PKG-INFO b/swh.journal.egg-info/PKG-INFO index 209f929..108b45f 100644 --- a/swh.journal.egg-info/PKG-INFO +++ b/swh.journal.egg-info/PKG-INFO @@ -1,76 +1,76 @@ Metadata-Version: 2.1 Name: swh.journal -Version: 0.9.1 +Version: 1.0.0 Summary: Software Heritage Journal utilities Home-page: https://forge.softwareheritage.org/diffusion/DJNL/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh-journal Project-URL: Documentation, https://docs.softwareheritage.org/devel/swh-journal/ Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Requires-Python: >=3.7 Description-Content-Type: text/markdown Provides-Extra: testing License-File: LICENSE License-File: AUTHORS swh-journal =========== Persistent logger of changes to the archive, with publish-subscribe support. See the [documentation](https://docs.softwareheritage.org/devel/swh-journal/index.html#software-heritage-journal) for more details. # Local test As a pre-requisite, you need a kakfa installation path. The following target will take care of this: ``` make install ``` Then, provided you are in the right virtual environment as described in the [swh getting-started](https://docs.softwareheritage.org/devel/developer-setup.html#developer-setup): ``` pytest ``` or: ``` tox ``` # Running ## publisher Command: ``` $ swh-journal --config-file ~/.config/swh/journal/publisher.yml \ publisher ``` # Auto-completion To have the completion, add the following in your ~/.virtualenvs/swh/bin/postactivate: ``` eval "$(_SWH_JOURNAL_COMPLETE=$autocomplete_cmd swh-journal)" ``` diff --git a/swh.journal.egg-info/requires.txt b/swh.journal.egg-info/requires.txt index 8ab7ac4..3b1a75d 100644 --- a/swh.journal.egg-info/requires.txt +++ b/swh.journal.egg-info/requires.txt @@ -1,8 +1,10 @@ confluent-kafka msgpack!=1.0.1,>=1.0.0 tenacity +swh.core>=1.1 swh.model>=0.12.0 [testing] pytest hypothesis +swh.core[testing] diff --git a/swh/__init__.py b/swh/__init__.py index 8d9f151..b36383a 100644 --- a/swh/__init__.py +++ b/swh/__init__.py @@ -1,4 +1,3 @@ from pkgutil import extend_path -from typing import List -__path__: List[str] = extend_path(__path__, __name__) +__path__ = extend_path(__path__, __name__) diff --git a/swh/journal/client.py b/swh/journal/client.py index ff6f0ef..028f94b 100644 --- a/swh/journal/client.py +++ b/swh/journal/client.py @@ -1,323 +1,355 @@ -# Copyright (C) 2017 The Software Heritage developers +# Copyright (C) 2017-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import defaultdict +from importlib import import_module import logging import os -import time from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union from confluent_kafka import Consumer, KafkaError, KafkaException +from swh.core.statsd import statsd from swh.journal import DEFAULT_PREFIX from .serializers import kafka_to_value logger = logging.getLogger(__name__) rdkafka_logger = logging.getLogger(__name__ + ".rdkafka") # Only accepted offset reset policy accepted ACCEPTED_OFFSET_RESET = ["earliest", "latest"] # Errors that Kafka raises too often and are not useful; therefore they # we lower their log level to DEBUG instead of INFO. _SPAMMY_ERRORS = [ KafkaError._NO_OFFSET, ] +JOURNAL_MESSAGE_NUMBER_METRIC = "swh_journal_client_handle_message_total" +JOURNAL_STATUS_METRIC = "swh_journal_client_status" + def get_journal_client(cls: str, **kwargs: Any): """Factory function to instantiate a journal client object. Currently, only the "kafka" journal client is supported. """ if cls == "kafka": + if "stats_cb" in kwargs: + stats_cb = kwargs["stats_cb"] + if isinstance(stats_cb, str): + try: + module_path, func_name = stats_cb.split(":") + except ValueError: + raise ValueError( + "Invalid stats_cb configuration option: " + "it should be a string like 'path.to.module:function'" + ) + try: + module = import_module(module_path, package=__package__) + except ModuleNotFoundError: + raise ValueError( + "Invalid stats_cb configuration option: " + f"module {module_path} not found" + ) + try: + kwargs["stats_cb"] = getattr(module, func_name) + except AttributeError: + raise ValueError( + "Invalid stats_cb configuration option: " + f"function {func_name} not found in module {module_path}" + ) return JournalClient(**kwargs) raise ValueError("Unknown journal client class `%s`" % cls) def _error_cb(error): if error.fatal(): raise KafkaException(error) if error.code() in _SPAMMY_ERRORS: logger.debug("Received non-fatal kafka error: %s", error) else: logger.info("Received non-fatal kafka error: %s", error) def _on_commit(error, partitions): if error is not None: _error_cb(error) class JournalClient: """A base client for the Software Heritage journal. The current implementation of the journal uses Apache Kafka brokers to publish messages under a given topic prefix, with each object type using a specific topic under that prefix. If the `prefix` argument is None (default value), it will take the default value `'swh.journal.objects'`. Clients subscribe to events specific to each object type as listed in the `object_types` argument (if unset, defaults to all existing kafka topic under the prefix). Clients can be sharded by setting the `group_id` to a common value across instances. The journal will share the message throughput across the nodes sharing the same group_id. Messages are processed by the `worker_fn` callback passed to the `process` method, in batches of maximum `batch_size` messages (defaults to 200). The objects passed to the `worker_fn` callback are the result of the kafka message converted by the `value_deserializer` function. By default (if this argument is not given), it will produce dicts (using the `kafka_to_value` function). This signature of the function is: `value_deserializer(object_type: str, kafka_msg: bytes) -> Any` If the value returned by `value_deserializer` is None, it is ignored and not passed the `worker_fn` function. If set, the processing stops after processing `stop_after_objects` messages in total. `stop_on_eof` stops the processing when the client has reached the end of each partition in turn. `auto_offset_reset` sets the behavior of the client when the consumer group initializes: `'earliest'` (the default) processes all objects since the inception of the topics; `''` Any other named argument is passed directly to KafkaConsumer(). """ def __init__( self, brokers: Union[str, List[str]], group_id: str, prefix: Optional[str] = None, object_types: Optional[List[str]] = None, privileged: bool = False, stop_after_objects: Optional[int] = None, batch_size: int = 200, process_timeout: Optional[float] = None, auto_offset_reset: str = "earliest", stop_on_eof: bool = False, value_deserializer: Optional[Callable[[str, bytes], Any]] = None, **kwargs, ): if prefix is None: prefix = DEFAULT_PREFIX if auto_offset_reset not in ACCEPTED_OFFSET_RESET: raise ValueError( "Option 'auto_offset_reset' only accept %s, not %s" % (ACCEPTED_OFFSET_RESET, auto_offset_reset) ) if batch_size <= 0: raise ValueError("Option 'batch_size' needs to be positive") if value_deserializer: self.value_deserializer = value_deserializer else: self.value_deserializer = lambda _, value: kafka_to_value(value) if isinstance(brokers, str): brokers = [brokers] debug_logging = rdkafka_logger.isEnabledFor(logging.DEBUG) if debug_logging and "debug" not in kwargs: kwargs["debug"] = "consumer" # Static group instance id management group_instance_id = os.environ.get("KAFKA_GROUP_INSTANCE_ID") if group_instance_id: kwargs["group.instance.id"] = group_instance_id if "group.instance.id" in kwargs: # When doing static consumer group membership, set a higher default # session timeout. The session timeout is the duration after which # the broker considers that a consumer has left the consumer group # for good, and triggers a rebalance. Considering our current # processing pattern, 10 minutes gives the consumer ample time to # restart before that happens. if "session.timeout.ms" not in kwargs: kwargs["session.timeout.ms"] = 10 * 60 * 1000 # 10 minutes if "session.timeout.ms" in kwargs: # When the session timeout is set, rdkafka requires the max poll # interval to be set to a higher value; the max poll interval is # rdkafka's way of figuring out whether the client's message # processing thread has stalled: when the max poll interval lapses # between two calls to consumer.poll(), rdkafka leaves the consumer # group and terminates the connection to the brokers. # # We default to 1.5 times the session timeout if "max.poll.interval.ms" not in kwargs: kwargs["max.poll.interval.ms"] = kwargs["session.timeout.ms"] // 2 * 3 consumer_settings = { **kwargs, "bootstrap.servers": ",".join(brokers), "auto.offset.reset": auto_offset_reset, "group.id": group_id, "on_commit": _on_commit, "error_cb": _error_cb, "enable.auto.commit": False, "logger": rdkafka_logger, } self.stop_on_eof = stop_on_eof if self.stop_on_eof: consumer_settings["enable.partition.eof"] = True logger.debug("Consumer settings: %s", consumer_settings) self.consumer = Consumer(consumer_settings) if privileged: privileged_prefix = f"{prefix}_privileged" else: # do not attempt to subscribe to privileged topics privileged_prefix = f"{prefix}" existing_topics = [ topic for topic in self.consumer.list_topics(timeout=10).topics.keys() if ( topic.startswith(f"{prefix}.") or topic.startswith(f"{privileged_prefix}.") ) ] if not existing_topics: raise ValueError( f"The prefix {prefix} does not match any existing topic " "on the kafka broker" ) if not object_types: object_types = list({topic.split(".")[-1] for topic in existing_topics}) self.subscription = [] unknown_types = [] for object_type in object_types: topics = (f"{privileged_prefix}.{object_type}", f"{prefix}.{object_type}") for topic in topics: if topic in existing_topics: self.subscription.append(topic) break else: unknown_types.append(object_type) if unknown_types: raise ValueError( f"Topic(s) for object types {','.join(unknown_types)} " "are unknown on the kafka broker" ) logger.debug(f"Upstream topics: {existing_topics}") self.subscribe() self.stop_after_objects = stop_after_objects - self.process_timeout = process_timeout + self.eof_reached: Set[Tuple[str, str]] = set() self.batch_size = batch_size + if process_timeout is not None: + raise DeprecationWarning( + "'process_timeout' argument is not supported anymore by " + "JournalClient; please remove it from your configuration.", + ) + def subscribe(self): """Subscribe to topics listed in self.subscription This can be overridden if you need, for instance, to manually assign partitions. """ logger.debug(f"Subscribing to: {self.subscription}") self.consumer.subscribe(topics=self.subscription) def process(self, worker_fn): """Polls Kafka for a batch of messages, and calls the worker_fn with these messages. Args: worker_fn Callable[Dict[str, List[dict]]]: Function called with the messages as argument. """ - start_time = time.monotonic() total_objects_processed = 0 - - while True: - # timeout for message poll - timeout = 1.0 - - elapsed = time.monotonic() - start_time - if self.process_timeout: - # +0.01 to prevent busy-waiting on / spamming consumer.poll. - # consumer.consume() returns shortly before X expired - # (a matter of milliseconds), so after it returns a first - # time, it would then be called with a timeout in the order - # of milliseconds, therefore returning immediately, then be - # called again, etc. - if elapsed + 0.01 >= self.process_timeout: - break - - timeout = self.process_timeout - elapsed - - batch_size = self.batch_size - if self.stop_after_objects: - if total_objects_processed >= self.stop_after_objects: + # timeout for message poll + timeout = 1.0 + + with statsd.status_gauge( + JOURNAL_STATUS_METRIC, statuses=["idle", "processing", "waiting"] + ) as set_status: + set_status("idle") + while True: + batch_size = self.batch_size + if self.stop_after_objects: + if total_objects_processed >= self.stop_after_objects: + break + + # clamp batch size to avoid overrunning stop_after_objects + batch_size = min( + self.stop_after_objects - total_objects_processed, batch_size, + ) + + set_status("waiting") + while True: + messages = self.consumer.consume( + timeout=timeout, num_messages=batch_size + ) + if messages: + break + + set_status("processing") + batch_processed, at_eof = self.handle_messages(messages, worker_fn) + + set_status("idle") + # report the number of handled messages + statsd.increment(JOURNAL_MESSAGE_NUMBER_METRIC, value=batch_processed) + total_objects_processed += batch_processed + if at_eof: break - # clamp batch size to avoid overrunning stop_after_objects - batch_size = min( - self.stop_after_objects - total_objects_processed, batch_size, - ) - - messages = self.consumer.consume(timeout=timeout, num_messages=batch_size) - if not messages: - continue - - batch_processed, at_eof = self.handle_messages(messages, worker_fn) - total_objects_processed += batch_processed - if at_eof: - break - return total_objects_processed def handle_messages(self, messages, worker_fn): objects: Dict[str, List[Any]] = defaultdict(list) nb_processed = 0 for message in messages: error = message.error() if error is not None: if error.code() == KafkaError._PARTITION_EOF: self.eof_reached.add((message.topic(), message.partition())) else: _error_cb(error) continue if message.value() is None: # ignore message with no payload, these can be generated in tests continue nb_processed += 1 object_type = message.topic().split(".")[-1] deserialized_object = self.deserialize_message( message, object_type=object_type ) if deserialized_object is not None: objects[object_type].append(deserialized_object) if objects: worker_fn(dict(objects)) self.consumer.commit() at_eof = self.stop_on_eof and all( (tp.topic, tp.partition) in self.eof_reached for tp in self.consumer.assignment() ) return nb_processed, at_eof def deserialize_message(self, message, object_type=None): return self.value_deserializer(object_type, message.value()) def close(self): self.consumer.close() diff --git a/swh/journal/pytest_plugin.py b/swh/journal/pytest_plugin.py index 44bbb74..eb82b69 100644 --- a/swh/journal/pytest_plugin.py +++ b/swh/journal/pytest_plugin.py @@ -1,261 +1,261 @@ # Copyright (C) 2019-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import defaultdict import random import string from typing import Any, Collection, Dict, Iterator, Optional import attr from confluent_kafka import Consumer, KafkaException, Producer from confluent_kafka.admin import AdminClient import pytest from swh.journal.serializers import kafka_to_key, kafka_to_value, pprint_key from swh.model.tests.swh_model_data import TEST_OBJECTS def ensure_lists(value: Any) -> Any: """ >>> ensure_lists(["foo", 42]) ['foo', 42] >>> ensure_lists(("foo", 42)) ['foo', 42] >>> ensure_lists({"a": ["foo", 42]}) {'a': ['foo', 42]} >>> ensure_lists({"a": ("foo", 42)}) {'a': ['foo', 42]} """ if isinstance(value, (tuple, list)): return list(map(ensure_lists, value)) elif isinstance(value, dict): return dict(ensure_lists(list(value.items()))) else: return value def consume_messages(consumer, kafka_prefix, expected_messages): """Consume expected_messages from the consumer; Sort them all into a consumed_objects dict""" consumed_messages = defaultdict(list) fetched_messages = 0 retries_left = 1000 while fetched_messages < expected_messages: if retries_left == 0: raise ValueError( "Timed out fetching messages from kafka. " f"Only {fetched_messages}/{expected_messages} fetched" ) - msg = consumer.poll(timeout=0.01) + msg = consumer.poll(timeout=0.1) if not msg: retries_left -= 1 continue error = msg.error() if error is not None: if error.fatal(): raise KafkaException(error) retries_left -= 1 continue fetched_messages += 1 topic = msg.topic() assert topic.startswith(f"{kafka_prefix}.") or topic.startswith( f"{kafka_prefix}_privileged." ), "Unexpected topic" object_type = topic[len(kafka_prefix + ".") :] consumed_messages[object_type].append( (kafka_to_key(msg.key()), kafka_to_value(msg.value())) ) return consumed_messages def assert_all_objects_consumed( consumed_messages: Dict, exclude: Optional[Collection] = None ): """Check whether all objects from TEST_OBJECTS have been consumed `exclude` can be a list of object types for which we do not want to compare the values (eg. for anonymized object). """ for object_type, known_objects in TEST_OBJECTS.items(): known_keys = [obj.unique_key() for obj in known_objects] if not consumed_messages[object_type]: return (received_keys, received_values) = zip(*consumed_messages[object_type]) if object_type in ("content", "skipped_content"): for value in received_values: value.pop("ctime", None) if object_type == "content": known_objects = [attr.evolve(o, data=None) for o in known_objects] for key in known_keys: assert key in received_keys, ( f"expected {object_type} key {pprint_key(key)} " "absent from consumed messages" ) if exclude and object_type in exclude: continue for value in known_objects: expected_value = value.to_dict() if value.object_type in ("content", "skipped_content"): expected_value.pop("ctime", None) assert ensure_lists(expected_value) in received_values, ( f"expected {object_type} value {value!r} is " "absent from consumed messages" ) @pytest.fixture(scope="function") def kafka_prefix(): """Pick a random prefix for kafka topics on each call""" return "".join(random.choice(string.ascii_lowercase) for _ in range(10)) @pytest.fixture(scope="function") def kafka_consumer_group(kafka_prefix: str): """Pick a random consumer group for kafka consumers on each call""" return "test-consumer-%s" % kafka_prefix @pytest.fixture(scope="function") def object_types(): """Set of object types to precreate topics for.""" return set(TEST_OBJECTS.keys()) @pytest.fixture(scope="function") def privileged_object_types(): """Set of object types to precreate privileged topics for.""" return {"revision", "release"} @pytest.fixture(scope="function") def kafka_server( kafka_server_base: str, kafka_prefix: str, object_types: Iterator[str], privileged_object_types: Iterator[str], ) -> str: """A kafka server with existing topics Unprivileged topics are built as ``{kafka_prefix}.{object_type}`` with object_type from the ``object_types`` list. Privileged topics are built as ``{kafka_prefix}_privileged.{object_type}`` with object_type from the ``privileged_object_types`` list. """ topics = [f"{kafka_prefix}.{obj}" for obj in object_types] + [ f"{kafka_prefix}_privileged.{obj}" for obj in privileged_object_types ] # unfortunately, the Mock broker does not support the CreatTopic admin API, so we # have to create topics using a Producer. producer = Producer( { "bootstrap.servers": kafka_server_base, "client.id": "bootstrap producer", "acks": "all", } ) for topic in topics: producer.produce(topic=topic, value=None) for i in range(10): if producer.flush(0.1) == 0: break return kafka_server_base @pytest.fixture(scope="session") def kafka_server_base() -> Iterator[str]: """Create a mock kafka cluster suitable for tests. Yield a connection string. Note: this is a generator to keep the mock broker alive during the whole test session. see https://github.com/edenhill/librdkafka/blob/master/src/rdkafka_mock.h """ admin = AdminClient({"test.mock.num.brokers": "1"}) metadata = admin.list_topics() brokers = [str(broker) for broker in metadata.brokers.values()] assert len(brokers) == 1, "More than one broker found in the kafka cluster?!" broker_connstr, broker_id = brokers[0].split("/") yield broker_connstr TEST_CONFIG = { "consumer_id": "swh.journal.consumer", "stop_on_eof": True, "storage": {"cls": "memory", "args": {}}, } @pytest.fixture def test_config( kafka_server_base: str, kafka_prefix: str, object_types: Iterator[str], privileged_object_types: Iterator[str], ): """Test configuration needed for producer/consumer """ return { **TEST_CONFIG, "object_types": object_types, "privileged_object_types": privileged_object_types, "brokers": [kafka_server_base], "prefix": kafka_prefix, } @pytest.fixture def consumer( kafka_server: str, test_config: Dict, kafka_consumer_group: str ) -> Consumer: """Get a connected Kafka consumer. """ consumer = Consumer( { "bootstrap.servers": kafka_server, "auto.offset.reset": "earliest", "enable.auto.commit": True, "group.id": kafka_consumer_group, } ) prefix = test_config["prefix"] kafka_topics = [ f"{prefix}.{object_type}" for object_type in test_config["object_types"] ] + [ f"{prefix}_privileged.{object_type}" for object_type in test_config["privileged_object_types"] ] consumer.subscribe(kafka_topics) yield consumer # Explicitly perform the commit operation on the consumer before closing it # to avoid possible hang since confluent-kafka v1.6.0 consumer.commit() consumer.close() diff --git a/tox.ini b/tox.ini index 9c0c890..4f23cf2 100644 --- a/tox.ini +++ b/tox.ini @@ -1,75 +1,75 @@ [tox] envlist=black,flake8,mypy,py3 [testenv] extras = testing deps = pytest-cov dev: pdbpp commands = pytest --cov={envsitepackagesdir}/swh/journal \ {envsitepackagesdir}/swh/journal \ --cov-branch \ --doctest-modules {posargs} [testenv:black] skip_install = true deps = black==19.10b0 commands = {envpython} -m black --check swh [testenv:flake8] skip_install = true deps = git+https://github.com/PyCQA/pyflakes.git flake8 commands = {envpython} -m flake8 [testenv:mypy] extras = testing deps = - mypy + mypy==0.920 commands = mypy swh # build documentation outside swh-environment using the current # git HEAD of swh-docs, is executed on CI for each diff to prevent # breaking doc build [testenv:sphinx] whitelist_externals = make usedevelop = true extras = testing deps = # fetch and install swh-docs in develop mode -e git+https://forge.softwareheritage.org/source/swh-docs#egg=swh.docs setenv = SWH_PACKAGE_DOC_TOX_BUILD = 1 # turn warnings into errors SPHINXOPTS = -W commands = make -I ../.tox/sphinx/src/swh-docs/swh/ -C docs # build documentation only inside swh-environment using local state # of swh-docs package [testenv:sphinx-dev] whitelist_externals = make usedevelop = true extras = testing deps = # install swh-docs in develop mode -e ../swh-docs setenv = SWH_PACKAGE_DOC_TOX_BUILD = 1 # turn warnings into errors SPHINXOPTS = -W commands = make -I ../.tox/sphinx-dev/src/swh-docs/swh/ -C docs