diff --git a/CONTRIBUTORS b/CONTRIBUTORS new file mode 100644 index 0000000..b97d981 --- /dev/null +++ b/CONTRIBUTORS @@ -0,0 +1 @@ +Kumar Shivendu diff --git a/Makefile.local b/Makefile.local deleted file mode 100644 index 81e402e..0000000 --- a/Makefile.local +++ /dev/null @@ -1,15 +0,0 @@ -install: - bin/install-kafka.sh install - -clean: - bin/install-kafka.sh clean - -clean-all: - bin/install-kafka.sh clean - bin/install-kafka.sh clean-cache - -test-fast: - pytest - -test: - tox diff --git a/PKG-INFO b/PKG-INFO index 2c77a37..1b4f654 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,72 +1,76 @@ Metadata-Version: 2.1 Name: swh.journal -Version: 0.7.1 +Version: 0.8.0 Summary: Software Heritage Journal utilities Home-page: https://forge.softwareheritage.org/diffusion/DJNL/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh-journal Project-URL: Documentation, https://docs.softwareheritage.org/devel/swh-journal/ -Description: swh-journal - =========== - - Persistent logger of changes to the archive, with publish-subscribe support. - - See the - [documentation](https://docs.softwareheritage.org/devel/swh-journal/index.html#software-heritage-journal) - for more details. - - # Local test - - As a pre-requisite, you need a kakfa installation path. - The following target will take care of this: - - ``` - make install - ``` - - Then, provided you are in the right virtual environment as described - in the [swh getting-started](https://docs.softwareheritage.org/devel/developer-setup.html#developer-setup): - - ``` - pytest - ``` - - or: - - ``` - tox - ``` - - - # Running - - ## publisher - - Command: - ``` - $ swh-journal --config-file ~/.config/swh/journal/publisher.yml \ - publisher - ``` - - # Auto-completion - - To have the completion, add the following in your - ~/.virtualenvs/swh/bin/postactivate: - - ``` - eval "$(_SWH_JOURNAL_COMPLETE=$autocomplete_cmd swh-journal)" - ``` - Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Requires-Python: >=3.7 Description-Content-Type: text/markdown Provides-Extra: testing +License-File: LICENSE +License-File: AUTHORS + +swh-journal +=========== + +Persistent logger of changes to the archive, with publish-subscribe support. + +See the +[documentation](https://docs.softwareheritage.org/devel/swh-journal/index.html#software-heritage-journal) +for more details. + +# Local test + +As a pre-requisite, you need a kakfa installation path. +The following target will take care of this: + +``` +make install +``` + +Then, provided you are in the right virtual environment as described +in the [swh getting-started](https://docs.softwareheritage.org/devel/developer-setup.html#developer-setup): + +``` +pytest +``` + +or: + +``` +tox +``` + + +# Running + +## publisher + +Command: +``` +$ swh-journal --config-file ~/.config/swh/journal/publisher.yml \ + publisher +``` + +# Auto-completion + +To have the completion, add the following in your +~/.virtualenvs/swh/bin/postactivate: + +``` +eval "$(_SWH_JOURNAL_COMPLETE=$autocomplete_cmd swh-journal)" +``` + + diff --git a/debian/changelog b/debian/changelog index 89497e7..1f82976 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,499 +1,501 @@ -swh-journal (0.7.1-1~swh1~bpo10+1) buster-swh; urgency=medium +swh-journal (0.8.0-1~swh1) unstable-swh; urgency=medium - * Rebuild for buster-swh + * New upstream release 0.8.0 - (tagged by David Douard + on 2021-06-18 11:23:06 +0200) + * Upstream changes: - v0.8.0 - -- Software Heritage autobuilder (on jenkins-debian1) Tue, 09 Feb 2021 14:47:05 +0000 + -- Software Heritage autobuilder (on jenkins-debian1) Fri, 18 Jun 2021 10:17:01 +0000 swh-journal (0.7.1-1~swh1) unstable-swh; urgency=medium * New upstream release 0.7.1 - (tagged by Antoine Lambert on 2021-02-09 15:41:45 +0100) * Upstream changes: - version 0.7.1 -- Software Heritage autobuilder (on jenkins-debian1) Tue, 09 Feb 2021 14:46:12 +0000 swh-journal (0.7.0-1~swh1) unstable-swh; urgency=medium * New upstream release 0.7.0 - (tagged by David Douard on 2021-01-27 15:38:49 +0100) * Upstream changes: - v0.7.0 -- Software Heritage autobuilder (on jenkins-debian1) Wed, 27 Jan 2021 14:42:51 +0000 swh-journal (0.6.2-1~swh1) unstable-swh; urgency=medium * New upstream release 0.6.2 - (tagged by Vincent SELLIER on 2021-01-14 14:28:31 +0100) * Upstream changes: - v0.6.2 - * 2021-01-13 Add new field OriginVisitStatus.type field on test data -- Software Heritage autobuilder (on jenkins-debian1) Thu, 14 Jan 2021 13:31:15 +0000 swh-journal (0.6.1-1~swh1) unstable-swh; urgency=medium * New upstream release 0.6.1 - (tagged by Valentin Lorentz on 2020-12-21 11:29:15 +0100) * Upstream changes: - v0.6.1 - * serializers: Deserialize as lists instead of tuples - * Fix dependency on msgpack (>= 1.0.0) - * Blacklist msgpack 1.0.1. -- Software Heritage autobuilder (on jenkins-debian1) Mon, 21 Dec 2020 10:31:53 +0000 swh-journal (0.6.0-1~swh1) unstable-swh; urgency=medium * New upstream release 0.6.0 - (tagged by David Douard on 2020-12-08 15:27:18 +0100) * Upstream changes: - v0.6.0 -- Software Heritage autobuilder (on jenkins-debian1) Tue, 08 Dec 2020 14:31:08 +0000 swh-journal (0.5.1-1~swh1) unstable-swh; urgency=medium * New upstream release 0.5.1 - (tagged by Valentin Lorentz on 2020-11-05 15:17:23 +0100) * Upstream changes: - v0.5.1 - This release makes the JournalWriter usable by swh-indexer-storage: - * Make value_sanitizer an argument of JournalWriter. - * Make the type of values of JournalWriter generic, so it works with types not from swh-model. -- Software Heritage autobuilder (on jenkins-debian1) Thu, 05 Nov 2020 14:20:17 +0000 swh-journal (0.5.0-1~swh1) unstable-swh; urgency=medium * New upstream release 0.5.0 - (tagged by Nicolas Dandrimont on 2020-10-27 16:01:22 +0100) * Upstream changes: - Release swh.journal v0.5.0 - Use the model-provided unique_key as deduplication key - Drop long- deprecated swh.journal.cli module - Use RawExtrinsicMetadata.target attribute instead of .id - CI changes: pin black version, use upstream flake8 hook -- Software Heritage autobuilder (on jenkins-debian1) Tue, 27 Oct 2020 15:05:44 +0000 swh-journal (0.4.3-1~swh1) unstable-swh; urgency=medium * New upstream release 0.4.3 - (tagged by David Douard on 2020-09-25 11:51:00 +0200) * Upstream changes: - v0.4.3 -- Software Heritage autobuilder (on jenkins-debian1) Fri, 25 Sep 2020 09:53:37 +0000 swh-journal (0.4.2-1~swh2) unstable-swh; urgency=medium * Fix debian dependencies -- Antoine R. Dumont (@ardumont)) Fri, 07 Aug 2020 09:38:25 +0000 swh-journal (0.4.2-1~swh1) unstable-swh; urgency=medium * New upstream release 0.4.2 - (tagged by Antoine R. Dumont (@ardumont) on 2020-08-07 10:58:45 +0200) * Upstream changes: - v0.4.2 - pytest_plugin: Deal with the case when ctime is dropped - setup.py: Migrate from vcversioner to setuptools-scm -- Software Heritage autobuilder (on jenkins-debian1) Fri, 07 Aug 2020 09:01:25 +0000 swh-journal (0.4.1-1~swh1) unstable-swh; urgency=medium * New upstream release 0.4.1 - (tagged by Valentin Lorentz on 2020-07-31 11:25:19 +0200) * Upstream changes: - v0.4.1 - * Remove TEST_OBJECT_DICTS, use only TEST_OBJECTS. - * Add support for MetadataAuthority, MetadataFetcher, and RawExtrinsicMetadata. -- Software Heritage autobuilder (on jenkins-debian1) Fri, 31 Jul 2020 09:31:32 +0000 swh-journal (0.4.0-1~swh1) unstable-swh; urgency=medium * New upstream release 0.4.0 - (tagged by David Douard on 2020-07-06 13:49:40 +0200) * Upstream changes: - v0.4.0 -- Software Heritage autobuilder (on jenkins-debian1) Mon, 06 Jul 2020 11:58:06 +0000 swh-journal (0.3.5-1~swh1) unstable-swh; urgency=medium * New upstream release 0.3.5 - (tagged by Antoine R. Dumont (@ardumont) on 2020-07-01 15:55:31 +0200) * Upstream changes: - v0.3.5 - journal_data: Drop obsolete origin_visit fields - Use proper hash ids in tests' journal_data -- Software Heritage autobuilder (on jenkins-debian1) Wed, 01 Jul 2020 14:00:27 +0000 swh-journal (0.3.4-1~swh1) unstable-swh; urgency=medium * New upstream release 0.3.4 - (tagged by Antoine R. Dumont (@ardumont) on 2020-06-25 10:10:16 +0200) * Upstream changes: - v0.3.4 - Drop datetime conversion indirection -- Software Heritage autobuilder (on jenkins-debian1) Thu, 25 Jun 2020 08:13:02 +0000 swh-journal (0.3.3-1~swh1) unstable-swh; urgency=medium * New upstream release 0.3.3 - (tagged by Antoine R. Dumont (@ardumont) on 2020-06-25 09:34:32 +0200) * Upstream changes: - v0.3.3 - journal_data: Make origin-visit optional fields to None -- Software Heritage autobuilder (on jenkins-debian1) Thu, 25 Jun 2020 07:36:28 +0000 swh-journal (0.3.2-1~swh1) unstable-swh; urgency=medium * New upstream release 0.3.2 - (tagged by David Douard on 2020-06-17 09:29:44 +0200) * Upstream changes: - v0.3.2 -- Software Heritage autobuilder (on jenkins-debian1) Wed, 17 Jun 2020 07:37:10 +0000 swh-journal (0.3.1-1~swh1) unstable-swh; urgency=medium * New upstream release 0.3.1 - (tagged by Antoine R. Dumont (@ardumont) on 2020-06-10 10:54:44 +0200) * Upstream changes: - v0.3.1 - pytest_plugin: pprint key when assertion is not respected -- Software Heritage autobuilder (on jenkins-debian1) Wed, 10 Jun 2020 08:56:57 +0000 swh-journal (0.3.0-1~swh1) unstable-swh; urgency=medium * New upstream release 0.3.0 - (tagged by Antoine R. Dumont (@ardumont) on 2020-06-09 10:42:30 +0200) * Upstream changes: - v0.3.0 - Allow journal to deal with origin_visit_status - test: Use origin-visit date field as datetime to phase out iso8601 str -- Software Heritage autobuilder (on jenkins-debian1) Tue, 09 Jun 2020 08:44:44 +0000 swh-journal (0.2.0-1~swh1) unstable-swh; urgency=medium * New upstream release 0.2.0 - (tagged by David Douard on 2020-06-03 13:50:54 +0200) * Upstream changes: - v0.2.0 -- Software Heritage autobuilder (on jenkins-debian1) Wed, 03 Jun 2020 12:00:31 +0000 swh-journal (0.1.0-1~swh1) unstable-swh; urgency=medium * New upstream release 0.1.0 - (tagged by David Douard on 2020-05-07 10:33:18 +0200) * Upstream changes: - v0.1.0 -- Software Heritage autobuilder (on jenkins-debian1) Thu, 07 May 2020 08:35:49 +0000 swh-journal (0.0.32-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.32 - (tagged by Antoine R. Dumont (@ardumont) on 2020-05-04 18:02:45 +0200) * Upstream changes: - v0.0.32 - serializers: Make kafka_to_key implem compatible with stable version - setup.py: add documentation link - Remove the content replayer code - Remove the backfiller and the (storage) replayer -- Software Heritage autobuilder (on jenkins-debian1) Mon, 04 May 2020 16:04:38 +0000 swh-journal (0.0.31-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.31 - (tagged by David Douard on 2020-04-23 12:27:47 +0200) * Upstream changes: - v0.0.31 -- Software Heritage autobuilder (on jenkins-debian1) Thu, 23 Apr 2020 10:37:28 +0000 swh-journal (0.0.30-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.30 - (tagged by Nicolas Dandrimont on 2020-04-14 15:57:34 +0200) * Upstream changes: - Release swh.journal v0.0.30 - various test refactorings - enable black on the source code - accept swh.model objects in the journal writers - listen to delivery notifications after writing messages - default to message.max.bytes = 100 MB in the kafka writer -- Software Heritage autobuilder (on jenkins-debian1) Tue, 14 Apr 2020 14:10:54 +0000 swh-journal (0.0.29-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.29 - (tagged by Antoine R. Dumont (@ardumont) on 2020-03-27 10:40:46 +0100) * Upstream changes: - v0.0.29 - replayer: Allow legacy origin to be replayed - tests.utils: Test from modules using this need to call close method - tests: Adapt model according to latest change -- Software Heritage autobuilder (on jenkins-debian1) Fri, 27 Mar 2020 09:51:08 +0000 swh-journal (0.0.28-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.28 - (tagged by Antoine R. Dumont (@ardumont) on 2020-03-25 10:14:47 +0100) * Upstream changes: - v0.0.28 - journal.replay: Migrate to latest HashCollision change - replayer: factor out legacy objects fixers - journal.replay: Inline `_fix_origin_visit` for loop in insert_object - journal.replay: Align _fix_content with other fix methods - journal.replay: Align fix revision behavior to other fix methods - Remove extra 'perms' key of contents when replaying. -- Software Heritage autobuilder (on jenkins-debian1) Wed, 25 Mar 2020 09:23:55 +0000 swh-journal (0.0.27-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.27 - (tagged by Antoine R. Dumont (@ardumont) on 2020-03-16 16:09:33 +0100) * Upstream changes: - v0.0.27 - Migrate to latest origin_visit_upsert/add api changes - journal: Use swh-model objects instead of dicts in replay and writer - replay: Filter out colliding contents when replaying - Use better kafka producer semantics in the journal writers - Make the number of messages processed at a time by journal clients - configurable - Drop deprecated cli options - Replace deprecated options with a config file override in cli tests - Clean up the signature of test_cli's invoke method - Migrate test cli config to a dict instead of raw yaml - kafka: normalize KafkaJournalWriter.write_addition[s] API - Rename JournalClient.max_messages to JournalClient.stop_after_objects - Be more careful with content generation in test_write_replay - Add type annotations to swh.journal.client arguments - Unify tense for content replay statistics log entry - Add missing log4j.properties file from MANIFEST.in - Unify retry/error handling for content replay -- Software Heritage autobuilder (on jenkins-debian1) Mon, 16 Mar 2020 15:17:43 +0000 swh-journal (0.0.26-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.26 - (tagged by David Douard on 2020-03-06 15:36:23 +0100) * Upstream changes: - v0.0.26 -- Software Heritage autobuilder (on jenkins-debian1) Fri, 06 Mar 2020 14:47:18 +0000 swh-journal (0.0.25-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.25 - (tagged by Valentin Lorentz on 2020-01-30 15:35:41 +0100) * Upstream changes: - v0.0.25 - * Add support for swh-storage v0.0.168. - * Accept None dates when validating revisions. -- Software Heritage autobuilder (on jenkins-debian1) Thu, 30 Jan 2020 14:43:25 +0000 swh-journal (0.0.24-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.24 - (tagged by Antoine Lambert on 2020-01-06 16:14:32 +0100) * Upstream changes: - version 0.0.24 -- Software Heritage autobuilder (on jenkins-debian1) Mon, 06 Jan 2020 15:20:54 +0000 swh-journal (0.0.23-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.23 - (tagged by Nicolas Dandrimont on 2020-01-03 20:02:39 +0100) * Upstream changes: - Release swh.journal v0.0.23 - support short-hand syntax for initializing the journal direct writer -- Software Heritage autobuilder (on jenkins-debian1) Fri, 03 Jan 2020 19:06:31 +0000 swh-journal (0.0.21-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.21 - (tagged by David Douard on 2019-11-29 15:37:49 +0100) * Upstream changes: - v0.0.21 -- Software Heritage autobuilder (on jenkins-debian1) Mon, 02 Dec 2019 14:12:49 +0000 swh-journal (0.0.20-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.20 - (tagged by David Douard on 2019-11-29 12:06:13 +0100) * Upstream changes: - v0.0.20 -- Software Heritage autobuilder (on jenkins-debian1) Fri, 29 Nov 2019 11:10:44 +0000 swh-journal (0.0.19-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.19 - (tagged by Nicolas Dandrimont on 2019-11-07 14:33:58 +0100) * Upstream changes: - Release swh.journal v0.0.19 - Merge several reliability fixes -- Software Heritage autobuilder (on jenkins-debian1) Thu, 07 Nov 2019 13:37:23 +0000 swh-journal (0.0.18-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.18 - (tagged by Stefano Zacchiroli on 2019-10-01 10:19:26 +0200) * Upstream changes: - v0.0.18 - * tox: anticipate mypy run to just after flake8 - * init.py: switch to documented way of extending path - * typing: minimal changes to make a no-op mypy run pass - * writer: Normalize 'cls' value to 'memory' for in- memory instantiation - * Add a test directly for the journal client. -- Software Heritage autobuilder (on jenkins-debian1) Tue, 01 Oct 2019 11:12:03 +0000 swh-journal (0.0.17-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.17 - (tagged by Nicolas Dandrimont on 2019-09-18 18:01:50 +0200) * Upstream changes: - Release swh.journal v0.0.17 - Cleanup fallout from confluent_kafka migration - Better error handling and logging in direct_writer - Backfiller fixups - More extensive mock for KafkaConsumer -- Software Heritage autobuilder (on jenkins-debian1) Wed, 18 Sep 2019 16:08:20 +0000 swh-journal (0.0.16-1~swh2) unstable-swh; urgency=medium * Migrate to confluent-kafka -- Nicolas Dandrimont Fri, 13 Sep 2019 20:11:24 +0200 swh-journal (0.0.16-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.16 - (tagged by Nicolas Dandrimont on 2019-09-13 14:28:50 +0200) * Upstream changes: - Release swh.journal v0.0.16: - Migrate to confluent-kafka from python-kafka -- Software Heritage autobuilder (on jenkins-debian1) Fri, 13 Sep 2019 12:34:35 +0000 swh-journal (0.0.15-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.15 - (tagged by David Douard on 2019-09-10 16:50:42 +0200) * Upstream changes: - v0.0.15 -- Software Heritage autobuilder (on jenkins-debian1) Tue, 10 Sep 2019 14:53:55 +0000 swh-journal (0.0.14-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.14 - (tagged by David Douard on 2019-07-18 13:38:39 +0200) * Upstream changes: - 0.0.14 - Code of conduct - fix the backfiller - fix compatibility with click < 7 - make the replayer robust against old formats -- Software Heritage autobuilder (on jenkins-debian1) Thu, 18 Jul 2019 11:44:40 +0000 swh-journal (0.0.13-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.13 - (tagged by Antoine R. Dumont (@ardumont) on 2019-07-03 10:26:29 +0200) * Upstream changes: - v0.0.13 - cli: Document depreated options -- Software Heritage autobuilder (on jenkins-debian1) Wed, 03 Jul 2019 08:33:57 +0000 swh-journal (0.0.12-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.12 - (tagged by Valentin Lorentz on 2019-07-02 11:58:00 +0200) * Upstream changes: - v0.0.12 - More CLI option - Replay parallelism - Fix build on Debian 9 -- Software Heritage autobuilder (on jenkins-debian1) Tue, 02 Jul 2019 10:08:07 +0000 swh-journal (0.0.11-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.11 - (tagged by David Douard on 2019-06-12 13:58:14 +0200) * Upstream changes: - v0.0.11 -- Software Heritage autobuilder (on jenkins-debian1) Wed, 12 Jun 2019 12:10:52 +0000 swh-journal (0.0.10-1~swh2) unstable-swh; urgency=medium * Disable tests at build-time -- Nicolas Dandrimont Thu, 09 May 2019 14:42:24 +0200 swh-journal (0.0.10-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.10 - (tagged by Nicolas Dandrimont on 2019-05-09 14:29:52 +0200) * Upstream changes: - Release swh.journal v0.0.10 - Remove the publisher component, introduce the backfiller component. -- Software Heritage autobuilder (on jenkins-debian1) Thu, 09 May 2019 12:34:36 +0000 swh-journal (0.0.9-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.9 - (tagged by David Douard on 2019-04-10 13:42:32 +0200) * Upstream changes: - v0.0.9 -- Software Heritage autobuilder (on jenkins-debian1) Wed, 10 Apr 2019 11:48:39 +0000 swh-journal (0.0.8-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.8 - (tagged by Antoine R. Dumont (@ardumont) on 2019-03-15 13:56:57 +0100) * Upstream changes: - v0.0.8 - Add swh-journal cli -- Software Heritage autobuilder (on jenkins-debian1) Fri, 15 Mar 2019 13:00:11 +0000 swh-journal (0.0.7-1~swh2) unstable-swh; urgency=low * New release fixing build dependencies -- Antoine Romain Dumont (@ardumont) Tue, 19 Feb 2019 14:18:06 +0100 swh-journal (0.0.7-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.7 - (tagged by Antoine R. Dumont (@ardumont) on 2019-01-11 11:53:44 +0100) * Upstream changes: - v0.0.7 - Fix off-by-one error when checking max_messages. - tests: Adapt tests according to latest in-memory storage changes -- Software Heritage autobuilder (on jenkins-debian1) Fri, 11 Jan 2019 10:56:46 +0000 swh-journal (0.0.4-1~swh1) unstable-swh; urgency=medium * Release swh.journal version 0.0.4 * Update packaging runes -- Nicolas Dandrimont Thu, 12 Oct 2017 19:01:53 +0200 swh-journal (0.0.3-1~swh1) unstable-swh; urgency=medium * Release swh.journal v0.0.3 * Prepare building for stretch -- Nicolas Dandrimont Fri, 30 Jun 2017 17:29:15 +0200 swh-journal (0.0.2-1~swh1) unstable-swh; urgency=medium * v0.0.2 * Adapt swh.journal.publisher * Adapt swh.journal.client * Add swh.journal.checker basic implementation (reads and sends all * objects to publisher's subscribed queues). -- Antoine R. Dumont (@ardumont) Fri, 24 Mar 2017 12:54:16 +0100 swh-journal (0.0.1-1~swh1) unstable-swh; urgency=medium * Initial release * v0.0.1 * Add a journal publisher * Add a base class interface for journal clients -- Antoine R. Dumont (@ardumont) Tue, 21 Mar 2017 14:38:13 +0100 diff --git a/debian/rules b/debian/rules index e2f0bd1..5a0aa59 100755 --- a/debian/rules +++ b/debian/rules @@ -1,14 +1,11 @@ #!/usr/bin/make -f export PYBUILD_NAME=swh.journal -export PYBUILD_TEST_ARGS=-m 'not db and not fs' +export PYBUILD_TEST_ARGS=-m 'not db and not fs' -p swh.journal.pytest_plugin %: dh $@ --with python3 --buildsystem=pybuild override_dh_install: dh_install rm -v $(CURDIR)/debian/python3-*/usr/lib/python*/dist-packages/swh/__init__.py - -override_dh_auto_test: - # pytest-kafka unavailable and painful to package diff --git a/swh.journal.egg-info/PKG-INFO b/swh.journal.egg-info/PKG-INFO index 2c77a37..1b4f654 100644 --- a/swh.journal.egg-info/PKG-INFO +++ b/swh.journal.egg-info/PKG-INFO @@ -1,72 +1,76 @@ Metadata-Version: 2.1 Name: swh.journal -Version: 0.7.1 +Version: 0.8.0 Summary: Software Heritage Journal utilities Home-page: https://forge.softwareheritage.org/diffusion/DJNL/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh-journal Project-URL: Documentation, https://docs.softwareheritage.org/devel/swh-journal/ -Description: swh-journal - =========== - - Persistent logger of changes to the archive, with publish-subscribe support. - - See the - [documentation](https://docs.softwareheritage.org/devel/swh-journal/index.html#software-heritage-journal) - for more details. - - # Local test - - As a pre-requisite, you need a kakfa installation path. - The following target will take care of this: - - ``` - make install - ``` - - Then, provided you are in the right virtual environment as described - in the [swh getting-started](https://docs.softwareheritage.org/devel/developer-setup.html#developer-setup): - - ``` - pytest - ``` - - or: - - ``` - tox - ``` - - - # Running - - ## publisher - - Command: - ``` - $ swh-journal --config-file ~/.config/swh/journal/publisher.yml \ - publisher - ``` - - # Auto-completion - - To have the completion, add the following in your - ~/.virtualenvs/swh/bin/postactivate: - - ``` - eval "$(_SWH_JOURNAL_COMPLETE=$autocomplete_cmd swh-journal)" - ``` - Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Requires-Python: >=3.7 Description-Content-Type: text/markdown Provides-Extra: testing +License-File: LICENSE +License-File: AUTHORS + +swh-journal +=========== + +Persistent logger of changes to the archive, with publish-subscribe support. + +See the +[documentation](https://docs.softwareheritage.org/devel/swh-journal/index.html#software-heritage-journal) +for more details. + +# Local test + +As a pre-requisite, you need a kakfa installation path. +The following target will take care of this: + +``` +make install +``` + +Then, provided you are in the right virtual environment as described +in the [swh getting-started](https://docs.softwareheritage.org/devel/developer-setup.html#developer-setup): + +``` +pytest +``` + +or: + +``` +tox +``` + + +# Running + +## publisher + +Command: +``` +$ swh-journal --config-file ~/.config/swh/journal/publisher.yml \ + publisher +``` + +# Auto-completion + +To have the completion, add the following in your +~/.virtualenvs/swh/bin/postactivate: + +``` +eval "$(_SWH_JOURNAL_COMPLETE=$autocomplete_cmd swh-journal)" +``` + + diff --git a/swh.journal.egg-info/SOURCES.txt b/swh.journal.egg-info/SOURCES.txt index 1dd0955..15665ce 100644 --- a/swh.journal.egg-info/SOURCES.txt +++ b/swh.journal.egg-info/SOURCES.txt @@ -1,46 +1,49 @@ .gitignore .pre-commit-config.yaml AUTHORS CODE_OF_CONDUCT.md +CONTRIBUTORS LICENSE MANIFEST.in Makefile -Makefile.local README.md mypy.ini pyproject.toml pytest.ini requirements-swh.txt requirements-test.txt requirements.txt setup.cfg setup.py tox.ini docs/.gitignore docs/Makefile docs/conf.py docs/index.rst docs/_static/.placeholder docs/_templates/.placeholder swh/__init__.py swh.journal.egg-info/PKG-INFO swh.journal.egg-info/SOURCES.txt swh.journal.egg-info/dependency_links.txt swh.journal.egg-info/entry_points.txt swh.journal.egg-info/requires.txt swh.journal.egg-info/top_level.txt swh/journal/__init__.py swh/journal/client.py swh/journal/py.typed swh/journal/pytest_plugin.py swh/journal/serializers.py swh/journal/tests/__init__.py swh/journal/tests/journal_data.py swh/journal/tests/log4j.properties swh/journal/tests/test_client.py +swh/journal/tests/test_inmemory.py swh/journal/tests/test_kafka_writer.py swh/journal/tests/test_pytest_plugin.py swh/journal/tests/test_serializers.py +swh/journal/tests/test_stream.py swh/journal/writer/__init__.py swh/journal/writer/inmemory.py -swh/journal/writer/kafka.py \ No newline at end of file +swh/journal/writer/kafka.py +swh/journal/writer/stream.py \ No newline at end of file diff --git a/swh/__init__.py b/swh/__init__.py index f14e196..8d9f151 100644 --- a/swh/__init__.py +++ b/swh/__init__.py @@ -1,4 +1,4 @@ from pkgutil import extend_path -from typing import Iterable +from typing import List -__path__ = extend_path(__path__, __name__) # type: Iterable[str] +__path__: List[str] = extend_path(__path__, __name__) diff --git a/swh/journal/tests/test_inmemory.py b/swh/journal/tests/test_inmemory.py new file mode 100644 index 0000000..c414a9b --- /dev/null +++ b/swh/journal/tests/test_inmemory.py @@ -0,0 +1,48 @@ +import pytest + +from swh.journal.writer import model_object_dict_sanitizer +from swh.journal.writer.inmemory import InMemoryJournalWriter +from swh.model.model import BaseModel +from swh.model.tests.swh_model_data import TEST_OBJECTS + + +def test_write_additions_with_test_objects(): + writer = InMemoryJournalWriter[BaseModel]( + value_sanitizer=model_object_dict_sanitizer + ) + expected = [] + + for object_type, objects in TEST_OBJECTS.items(): + writer.write_additions(object_type, objects) + + for object in objects: + expected.append((object_type, object)) + + assert list(writer.privileged_objects) == [] + assert set(expected) == set(writer.objects) + + +def test_write_additions_with_privileged_test_objects(): + writer = InMemoryJournalWriter[BaseModel]( + value_sanitizer=model_object_dict_sanitizer + ) + + expected = [] + + for object_type, objects in TEST_OBJECTS.items(): + writer.write_additions(object_type, objects, True) + + for object in objects: + expected.append((object_type, object)) + + assert list(writer.objects) == [] + assert set(expected) == set(writer.privileged_objects) + + +def test_write_addition_errors_without_unique_key(): + writer = InMemoryJournalWriter[BaseModel]( + value_sanitizer=model_object_dict_sanitizer + ) + + with pytest.raises(NotImplementedError): + writer.write_addition("BaseModel", BaseModel()) diff --git a/swh/journal/tests/test_kafka_writer.py b/swh/journal/tests/test_kafka_writer.py index db81330..241e767 100644 --- a/swh/journal/tests/test_kafka_writer.py +++ b/swh/journal/tests/test_kafka_writer.py @@ -1,172 +1,248 @@ # Copyright (C) 2018-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import logging from typing import Iterable from confluent_kafka import Consumer, Producer import pytest from swh.journal.pytest_plugin import assert_all_objects_consumed, consume_messages from swh.journal.writer import model_object_dict_sanitizer from swh.journal.writer.kafka import KafkaDeliveryError, KafkaJournalWriter from swh.model.model import BaseModel, Directory, Release, Revision from swh.model.tests.swh_model_data import TEST_OBJECTS def test_kafka_writer( kafka_prefix: str, kafka_server: str, consumer: Consumer, privileged_object_types: Iterable[str], ): writer = KafkaJournalWriter[BaseModel]( brokers=[kafka_server], client_id="kafka_writer", prefix=kafka_prefix, value_sanitizer=model_object_dict_sanitizer, anonymize=False, ) expected_messages = 0 for object_type, objects in TEST_OBJECTS.items(): writer.write_additions(object_type, objects) expected_messages += len(objects) consumed_messages = consume_messages(consumer, kafka_prefix, expected_messages) assert_all_objects_consumed(consumed_messages) for key, obj_dict in consumed_messages["revision"]: obj = Revision.from_dict(obj_dict) for person in (obj.author, obj.committer): assert not ( len(person.fullname) == 32 and person.name is None and person.email is None ) for key, obj_dict in consumed_messages["release"]: obj = Release.from_dict(obj_dict) + # author is optional for release + if obj.author is None: + continue for person in (obj.author,): assert not ( len(person.fullname) == 32 and person.name is None and person.email is None ) def test_kafka_writer_anonymized( kafka_prefix: str, kafka_server: str, consumer: Consumer, privileged_object_types: Iterable[str], ): writer = KafkaJournalWriter[BaseModel]( brokers=[kafka_server], client_id="kafka_writer", prefix=kafka_prefix, value_sanitizer=model_object_dict_sanitizer, anonymize=True, ) expected_messages = 0 for object_type, objects in TEST_OBJECTS.items(): writer.write_additions(object_type, objects) expected_messages += len(objects) if object_type in privileged_object_types: expected_messages += len(objects) consumed_messages = consume_messages(consumer, kafka_prefix, expected_messages) assert_all_objects_consumed(consumed_messages, exclude=["revision", "release"]) for key, obj_dict in consumed_messages["revision"]: obj = Revision.from_dict(obj_dict) for person in (obj.author, obj.committer): assert ( len(person.fullname) == 32 and person.name is None and person.email is None ) for key, obj_dict in consumed_messages["release"]: obj = Release.from_dict(obj_dict) + # author is optional for release + if obj.author is None: + continue for person in (obj.author,): assert ( len(person.fullname) == 32 and person.name is None and person.email is None ) -def test_write_delivery_failure( - kafka_prefix: str, kafka_server: str, consumer: Consumer -): +def test_write_delivery_failure(kafka_prefix: str, kafka_server: str): class MockKafkaError: """A mocked kafka error""" def str(self): return "Mocked Kafka Error" def name(self): return "SWH_MOCK_ERROR" class KafkaJournalWriterFailDelivery(KafkaJournalWriter): """A journal writer which always fails delivering messages""" def _on_delivery(self, error, message): """Replace the inbound error with a fake delivery error""" super()._on_delivery(MockKafkaError(), message) kafka_prefix += ".swh.journal.objects" writer = KafkaJournalWriterFailDelivery( brokers=[kafka_server], client_id="kafka_writer", prefix=kafka_prefix, value_sanitizer=model_object_dict_sanitizer, ) empty_dir = Directory(entries=()) with pytest.raises(KafkaDeliveryError) as exc: writer.write_addition("directory", empty_dir) assert "Failed deliveries" in exc.value.message assert len(exc.value.delivery_failures) == 1 delivery_failure = exc.value.delivery_failures[0] assert delivery_failure.key == empty_dir.id assert delivery_failure.code == "SWH_MOCK_ERROR" -def test_write_delivery_timeout( - kafka_prefix: str, kafka_server: str, consumer: Consumer -): +def test_write_delivery_timeout(kafka_prefix: str, kafka_server: str): produced = [] class MockProducer(Producer): """A kafka producer which pretends to produce messages, but never sends any delivery acknowledgements""" def produce(self, **kwargs): produced.append(kwargs) writer = KafkaJournalWriter[BaseModel]( brokers=[kafka_server], client_id="kafka_writer", prefix=kafka_prefix, value_sanitizer=model_object_dict_sanitizer, flush_timeout=1, producer_class=MockProducer, ) empty_dir = Directory(entries=()) with pytest.raises(KafkaDeliveryError) as exc: writer.write_addition("directory", empty_dir) assert len(produced) == 1 assert "timeout" in exc.value.message assert len(exc.value.delivery_failures) == 1 delivery_failure = exc.value.delivery_failures[0] assert delivery_failure.key == empty_dir.id assert delivery_failure.code == "SWH_FLUSH_TIMEOUT" + + +class MockBufferErrorProducer(Producer): + """A Kafka producer that returns a BufferError on the `n_buffererrors` + first calls to produce.""" + + def __init__(self, *args, **kwargs): + self.n_buffererrors = kwargs.pop("n_bufferrors", 0) + self.produce_calls = 0 + + super().__init__(*args, **kwargs) + + def produce(self, **kwargs): + self.produce_calls += 1 + if self.produce_calls <= self.n_buffererrors: + raise BufferError("Local: Queue full") + + self.produce_calls = 0 + return super().produce(**kwargs) + + +def test_write_BufferError_retry(kafka_prefix: str, kafka_server: str, caplog): + writer = KafkaJournalWriter[BaseModel]( + brokers=[kafka_server], + client_id="kafka_writer", + prefix=kafka_prefix, + value_sanitizer=model_object_dict_sanitizer, + flush_timeout=1, + producer_class=MockBufferErrorProducer, + ) + + writer.producer.n_buffererrors = 4 + + empty_dir = Directory(entries=()) + + caplog.set_level(logging.DEBUG, "swh.journal.writer.kafka") + writer.write_addition("directory", empty_dir) + records = [] + for record in caplog.records: + if "BufferError" in record.getMessage(): + records.append(record) + + assert len(records) == writer.producer.n_buffererrors + + +def test_write_BufferError_give_up(kafka_prefix: str, kafka_server: str, caplog): + writer = KafkaJournalWriter[BaseModel]( + brokers=[kafka_server], + client_id="kafka_writer", + prefix=kafka_prefix, + value_sanitizer=model_object_dict_sanitizer, + flush_timeout=1, + producer_class=MockBufferErrorProducer, + ) + + writer.producer.n_buffererrors = 5 + + empty_dir = Directory(entries=()) + + with pytest.raises(KafkaDeliveryError): + writer.write_addition("directory", empty_dir) + + +def test_write_addition_errors_without_unique_key(kafka_prefix: str, kafka_server: str): + writer = KafkaJournalWriter[BaseModel]( + brokers=[kafka_server], + client_id="kafka_writer", + prefix=kafka_prefix, + value_sanitizer=model_object_dict_sanitizer, + ) + + with pytest.raises(NotImplementedError): + writer.write_addition("BaseModel", BaseModel()) diff --git a/swh/journal/tests/test_pytest_plugin.py b/swh/journal/tests/test_pytest_plugin.py index 02c0ef7..4070714 100644 --- a/swh/journal/tests/test_pytest_plugin.py +++ b/swh/journal/tests/test_pytest_plugin.py @@ -1,72 +1,73 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Iterator from confluent_kafka.admin import AdminClient def test_kafka_server(kafka_server_base: str): ip, port_str = kafka_server_base.split(":") assert ip == "127.0.0.1" assert int(port_str) admin = AdminClient({"bootstrap.servers": kafka_server_base}) topics = admin.list_topics() assert len(topics.brokers) == 1 def test_kafka_server_with_topics( kafka_server: str, kafka_prefix: str, object_types: Iterator[str], privileged_object_types: Iterator[str], ): admin = AdminClient({"bootstrap.servers": kafka_server}) # check unprivileged topics are present topics = { topic for topic in admin.list_topics().topics if topic.startswith(f"{kafka_prefix}.") } assert topics == {f"{kafka_prefix}.{obj}" for obj in object_types} # check privileged topics are present topics = { topic for topic in admin.list_topics().topics if topic.startswith(f"{kafka_prefix}_privileged.") } assert topics == { f"{kafka_prefix}_privileged.{obj}" for obj in privileged_object_types } def test_test_config(test_config: dict, kafka_prefix: str, kafka_server_base: str): assert test_config == { "consumer_id": "swh.journal.consumer", "stop_after_objects": 1, "storage": {"cls": "memory", "args": {}}, "object_types": { "content", "directory", + "extid", "metadata_authority", "metadata_fetcher", "origin", "origin_visit", "origin_visit_status", "raw_extrinsic_metadata", "release", "revision", "snapshot", "skipped_content", }, "privileged_object_types": {"release", "revision",}, "brokers": [kafka_server_base], "prefix": kafka_prefix, } diff --git a/swh/journal/tests/test_stream.py b/swh/journal/tests/test_stream.py new file mode 100644 index 0000000..c9bfc90 --- /dev/null +++ b/swh/journal/tests/test_stream.py @@ -0,0 +1,47 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import io + +import msgpack + +from swh.journal.serializers import msgpack_ext_hook +from swh.journal.writer import get_journal_writer, model_object_dict_sanitizer +from swh.model.tests.swh_model_data import TEST_OBJECTS + + +def test_write_additions_with_test_objects(): + outs = io.BytesIO() + + writer = get_journal_writer( + cls="stream", value_sanitizer=model_object_dict_sanitizer, output_stream=outs, + ) + expected = [] + + n = 0 + for object_type, objects in TEST_OBJECTS.items(): + writer.write_additions(object_type, objects) + + for object in objects: + objd = object.to_dict() + if object_type == "content": + objd.pop("data") + + expected.append((object_type, objd)) + n += len(objects) + + outs.seek(0, 0) + unpacker = msgpack.Unpacker( + outs, + raw=False, + ext_hook=msgpack_ext_hook, + strict_map_key=False, + use_list=False, + timestamp=3, # convert Timestamp in datetime objects (tz UTC) + ) + + for i, (objtype, objd) in enumerate(unpacker, start=1): + assert (objtype, objd) in expected + assert len(expected) == i diff --git a/swh/journal/writer/__init__.py b/swh/journal/writer/__init__.py index 92879e6..662fa80 100644 --- a/swh/journal/writer/__init__.py +++ b/swh/journal/writer/__init__.py @@ -1,58 +1,62 @@ -# Copyright (C) 2019 The Software Heritage developers +# Copyright (C) 2019-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Any, Dict, Optional, TypeVar import warnings from typing_extensions import Protocol from swh.model.model import KeyType TSelf = TypeVar("TSelf") class ValueProtocol(Protocol): def anonymize(self: TSelf) -> Optional[TSelf]: ... def unique_key(self) -> KeyType: ... def to_dict(self) -> Dict[str, Any]: ... def model_object_dict_sanitizer( object_type: str, object_dict: Dict[str, Any] ) -> Dict[str, str]: object_dict = object_dict.copy() if object_type == "content": object_dict.pop("data", None) return object_dict def get_journal_writer(cls, **kwargs): if "args" in kwargs: warnings.warn( 'Explicit "args" key is deprecated, use keys directly instead.', DeprecationWarning, ) kwargs = kwargs["args"] kwargs.setdefault("value_sanitizer", model_object_dict_sanitizer) if cls == "inmemory": # FIXME: Remove inmemory in due time warnings.warn( "cls = 'inmemory' is deprecated, use 'memory' instead", DeprecationWarning ) cls = "memory" if cls == "memory": from .inmemory import InMemoryJournalWriter as JournalWriter elif cls == "kafka": from .kafka import KafkaJournalWriter as JournalWriter + elif cls == "stream": + from .stream import StreamJournalWriter as JournalWriter + + assert "output_stream" in kwargs else: raise ValueError("Unknown journal writer class `%s`" % cls) return JournalWriter(**kwargs) diff --git a/swh/journal/writer/inmemory.py b/swh/journal/writer/inmemory.py index 4272834..69f63f8 100644 --- a/swh/journal/writer/inmemory.py +++ b/swh/journal/writer/inmemory.py @@ -1,43 +1,45 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging from multiprocessing import Manager -from typing import Any, Generic, List, Tuple, TypeVar +from typing import Any, Callable, Dict, Generic, List, Tuple, TypeVar from . import ValueProtocol logger = logging.getLogger(__name__) TValue = TypeVar("TValue", bound=ValueProtocol) class InMemoryJournalWriter(Generic[TValue]): objects: List[Tuple[str, TValue]] privileged_objects: List[Tuple[str, TValue]] - def __init__(self, value_sanitizer: Any): + def __init__( + self, value_sanitizer: Callable[[str, Dict[str, Any]], Dict[str, Any]] + ): # Share the list of objects across processes, for RemoteAPI tests. self.manager = Manager() self.objects = self.manager.list() self.privileged_objects = self.manager.list() def write_addition( self, object_type: str, object_: TValue, privileged: bool = False ) -> None: object_.unique_key() # Check this does not error, to mimic the kafka writer if privileged: self.privileged_objects.append((object_type, object_)) else: self.objects.append((object_type, object_)) write_update = write_addition def write_additions( self, object_type: str, objects: List[TValue], privileged: bool = False ) -> None: for object_ in objects: self.write_addition(object_type, object_, privileged) diff --git a/swh/journal/writer/kafka.py b/swh/journal/writer/kafka.py index 3df3efb..2434146 100644 --- a/swh/journal/writer/kafka.py +++ b/swh/journal/writer/kafka.py @@ -1,249 +1,273 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging import time from typing import ( Any, Callable, Dict, Generic, Iterable, List, NamedTuple, Optional, Type, TypeVar, ) from confluent_kafka import KafkaException, Producer from swh.journal.serializers import KeyType, key_to_kafka, pprint_key, value_to_kafka from . import ValueProtocol logger = logging.getLogger(__name__) class DeliveryTag(NamedTuple): """Unique tag allowing us to check for a message delivery""" topic: str kafka_key: bytes class DeliveryFailureInfo(NamedTuple): """Verbose information for failed deliveries""" object_type: str key: KeyType message: str code: str def get_object_type(topic: str) -> str: """Get the object type from a topic string""" return topic.rsplit(".", 1)[-1] class KafkaDeliveryError(Exception): """Delivery failed on some kafka messages.""" def __init__(self, message: str, delivery_failures: Iterable[DeliveryFailureInfo]): self.message = message self.delivery_failures = list(delivery_failures) def pretty_failures(self) -> str: return ", ".join( f"{f.object_type} {pprint_key(f.key)} ({f.message})" for f in self.delivery_failures ) def __str__(self): return f"KafkaDeliveryError({self.message}, [{self.pretty_failures()}])" TValue = TypeVar("TValue", bound=ValueProtocol) class KafkaJournalWriter(Generic[TValue]): """This class is used to write serialized versions of value objects to a series of Kafka topics. The type parameter `TValue`, which must implement the `ValueProtocol`, is the type of values this writer will write. Typically, `TValue` will be `swh.model.model.BaseModel`. Topics used to send objects representations are built from a ``prefix`` plus the type of the object: ``{prefix}.{object_type}`` Objects can be sent as is, or can be anonymized. The anonymization feature, when activated, will write anonymized versions of value objects in the main topic, and stock (non-anonymized) objects will be sent to a dedicated (privileged) set of topics: ``{prefix}_privileged.{object_type}`` The anonymization of a value object is the result of calling its ``anonymize()`` method. An object is considered anonymizable if this method returns a (non-None) value. Args: brokers: list of broker addresses and ports. prefix: the prefix used to build the topic names for objects. client_id: the id of the writer sent to kafka. value_sanitizer: a function that takes the object type and the dict representation of an object as argument, and returns an other dict that should be actually stored in the journal (eg. removing keys that do no belong there) producer_config: extra configuration keys passed to the `Producer`. flush_timeout: timeout, in seconds, after which the `flush` operation will fail if some message deliveries are still pending. producer_class: override for the kafka producer class. anonymize: if True, activate the anonymization feature. """ def __init__( self, brokers: Iterable[str], prefix: str, client_id: str, value_sanitizer: Callable[[str, Dict[str, Any]], Dict[str, Any]], producer_config: Optional[Dict] = None, flush_timeout: float = 120, producer_class: Type[Producer] = Producer, anonymize: bool = False, ): self._prefix = prefix self._prefix_privileged = f"{self._prefix}_privileged" self.anonymize = anonymize if not producer_config: producer_config = {} if "message.max.bytes" not in producer_config: producer_config = { "message.max.bytes": 100 * 1024 * 1024, **producer_config, } self.producer = producer_class( { "bootstrap.servers": ",".join(brokers), "client.id": client_id, "on_delivery": self._on_delivery, "error_cb": self._error_cb, "logger": logger, "acks": "all", **producer_config, } ) # Delivery management self.flush_timeout = flush_timeout # delivery tag -> original object "key" mapping self.deliveries_pending: Dict[DeliveryTag, KeyType] = {} # List of (object_type, key, error_msg, error_name) for failed deliveries self.delivery_failures: List[DeliveryFailureInfo] = [] self.value_sanitizer = value_sanitizer def _error_cb(self, error): if error.fatal(): raise KafkaException(error) logger.info("Received non-fatal kafka error: %s", error) def _on_delivery(self, error, message): (topic, key) = delivery_tag = DeliveryTag(message.topic(), message.key()) sent_key = self.deliveries_pending.pop(delivery_tag, None) if error is not None: self.delivery_failures.append( DeliveryFailureInfo( get_object_type(topic), sent_key, error.str(), error.name() ) ) def send(self, topic: str, key: KeyType, value): kafka_key = key_to_kafka(key) - self.producer.produce( - topic=topic, key=kafka_key, value=value_to_kafka(value), + max_attempts = 5 + last_exception: Optional[Exception] = None + for attempt in range(max_attempts): + try: + self.producer.produce( + topic=topic, key=kafka_key, value=value_to_kafka(value), + ) + except BufferError as e: + last_exception = e + wait = 1 + 3 * attempt + + if logger.isEnabledFor(logging.DEBUG): # pprint_key is expensive + logger.debug( + "BufferError producing %s %s; waiting for %ss", + get_object_type(topic), + pprint_key(kafka_key), + wait, + ) + self.producer.poll(wait) + else: + self.deliveries_pending[DeliveryTag(topic, kafka_key)] = key + return + + # We reach this point if all delivery attempts have failed + self.delivery_failures.append( + DeliveryFailureInfo( + get_object_type(topic), key, str(last_exception), "SWH_BUFFER_ERROR" + ) ) - self.deliveries_pending[DeliveryTag(topic, kafka_key)] = key - def delivery_error(self, message) -> KafkaDeliveryError: """Get all failed deliveries, and clear them""" ret = self.delivery_failures self.delivery_failures = [] while self.deliveries_pending: delivery_tag, orig_key = self.deliveries_pending.popitem() (topic, kafka_key) = delivery_tag ret.append( DeliveryFailureInfo( get_object_type(topic), orig_key, "No delivery before flush() timeout", "SWH_FLUSH_TIMEOUT", ) ) return KafkaDeliveryError(message, ret) def flush(self): start = time.monotonic() self.producer.flush(self.flush_timeout) while self.deliveries_pending: if time.monotonic() - start > self.flush_timeout: break self.producer.poll(0.1) if self.deliveries_pending: # Delivery timeout raise self.delivery_error( "flush() exceeded timeout (%ss)" % self.flush_timeout, ) elif self.delivery_failures: raise self.delivery_error("Failed deliveries after flush()") def _write_addition(self, object_type: str, object_: TValue) -> None: """Write a single object to the journal""" key = object_.unique_key() if self.anonymize: anon_object_ = object_.anonymize() if anon_object_: # can be either None, or an anonymized object # if the object is anonymizable, send the non-anonymized version in the # privileged channel topic = f"{self._prefix_privileged}.{object_type}" dict_ = self.value_sanitizer(object_type, object_.to_dict()) logger.debug("topic: %s, key: %s, value: %s", topic, key, dict_) self.send(topic, key=key, value=dict_) object_ = anon_object_ topic = f"{self._prefix}.{object_type}" dict_ = self.value_sanitizer(object_type, object_.to_dict()) logger.debug("topic: %s, key: %s, value: %s", topic, key, dict_) self.send(topic, key=key, value=dict_) def write_addition(self, object_type: str, object_: TValue) -> None: """Write a single object to the journal""" self._write_addition(object_type, object_) self.flush() write_update = write_addition def write_additions(self, object_type: str, objects: Iterable[TValue]) -> None: """Write a set of objects to the journal""" for object_ in objects: self._write_addition(object_type, object_) self.flush() diff --git a/swh/journal/writer/inmemory.py b/swh/journal/writer/stream.py similarity index 53% copy from swh/journal/writer/inmemory.py copy to swh/journal/writer/stream.py index 4272834..202e13c 100644 --- a/swh/journal/writer/inmemory.py +++ b/swh/journal/writer/stream.py @@ -1,43 +1,47 @@ -# Copyright (C) 2019 The Software Heritage developers +# Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging -from multiprocessing import Manager -from typing import Any, Generic, List, Tuple, TypeVar +from typing import Any, BinaryIO, Callable, Dict, Generic, List, TypeVar + +from swh.journal.serializers import value_to_kafka from . import ValueProtocol logger = logging.getLogger(__name__) TValue = TypeVar("TValue", bound=ValueProtocol) -class InMemoryJournalWriter(Generic[TValue]): - objects: List[Tuple[str, TValue]] - privileged_objects: List[Tuple[str, TValue]] +class StreamJournalWriter(Generic[TValue]): + """A simple JournalWriter which serializes objects in a stream + + Might be used to serialize a storage in a file to generate a test dataset. + """ - def __init__(self, value_sanitizer: Any): + def __init__( + self, + output_stream: BinaryIO, + value_sanitizer: Callable[[str, Dict[str, Any]], Dict[str, Any]], + ): # Share the list of objects across processes, for RemoteAPI tests. - self.manager = Manager() - self.objects = self.manager.list() - self.privileged_objects = self.manager.list() + self.output = output_stream + self.value_sanitizer = value_sanitizer def write_addition( self, object_type: str, object_: TValue, privileged: bool = False ) -> None: object_.unique_key() # Check this does not error, to mimic the kafka writer - if privileged: - self.privileged_objects.append((object_type, object_)) - else: - self.objects.append((object_type, object_)) + dict_ = self.value_sanitizer(object_type, object_.to_dict()) + self.output.write(value_to_kafka((object_type, dict_))) write_update = write_addition def write_additions( self, object_type: str, objects: List[TValue], privileged: bool = False ) -> None: for object_ in objects: self.write_addition(object_type, object_, privileged) diff --git a/tox.ini b/tox.ini index bcd7b3f..9c0c890 100644 --- a/tox.ini +++ b/tox.ini @@ -1,37 +1,75 @@ [tox] envlist=black,flake8,mypy,py3 [testenv] extras = testing deps = pytest-cov dev: pdbpp commands = pytest --cov={envsitepackagesdir}/swh/journal \ {envsitepackagesdir}/swh/journal \ --cov-branch \ --doctest-modules {posargs} [testenv:black] skip_install = true deps = black==19.10b0 commands = {envpython} -m black --check swh [testenv:flake8] skip_install = true deps = git+https://github.com/PyCQA/pyflakes.git flake8 commands = {envpython} -m flake8 [testenv:mypy] extras = testing deps = mypy commands = mypy swh + +# build documentation outside swh-environment using the current +# git HEAD of swh-docs, is executed on CI for each diff to prevent +# breaking doc build +[testenv:sphinx] +whitelist_externals = make +usedevelop = true +extras = + testing +deps = + # fetch and install swh-docs in develop mode + -e git+https://forge.softwareheritage.org/source/swh-docs#egg=swh.docs + +setenv = + SWH_PACKAGE_DOC_TOX_BUILD = 1 + # turn warnings into errors + SPHINXOPTS = -W +commands = + make -I ../.tox/sphinx/src/swh-docs/swh/ -C docs + + +# build documentation only inside swh-environment using local state +# of swh-docs package +[testenv:sphinx-dev] +whitelist_externals = make +usedevelop = true +extras = + testing +deps = + # install swh-docs in develop mode + -e ../swh-docs + +setenv = + SWH_PACKAGE_DOC_TOX_BUILD = 1 + # turn warnings into errors + SPHINXOPTS = -W +commands = + make -I ../.tox/sphinx-dev/src/swh-docs/swh/ -C docs