diff --git a/swh/journal/client.py b/swh/journal/client.py --- a/swh/journal/client.py +++ b/swh/journal/client.py @@ -18,8 +18,8 @@ # Only accepted offset reset policy accepted ACCEPTED_OFFSET_RESET = ['earliest', 'latest'] -# Only accepted object types -ACCEPTED_OBJECT_TYPES = [ +# Default object types, which are the node types in the SWH model +DEFAULT_OBJECT_TYPES = [ 'content', 'directory', 'revision', @@ -29,6 +29,11 @@ 'origin_visit' ] +# Only accepted object types +ACCEPTED_OBJECT_TYPES = DEFAULT_OBJECT_TYPES + [ + 'origin_intrinsic_metadata', +] + class JournalClient: """A base client for the Software Heritage journal. @@ -40,7 +45,8 @@ 'swh.journal.objects'. Clients subscribe to events specific to each object type as listed in the - `object_types` argument (if unset, defaults to all accepted object types). + `object_types` argument (if unset, defaults to `DEFAULT_OBJECT_TYPES`, + all object types in the SWH model). Clients can be sharded by setting the `group_id` to a common value across instances. The journal will share the message @@ -59,7 +65,7 @@ if prefix is None: prefix = DEFAULT_PREFIX if object_types is None: - object_types = ACCEPTED_OBJECT_TYPES + object_types = DEFAULT_OBJECT_TYPES if auto_offset_reset not in ACCEPTED_OFFSET_RESET: raise ValueError( 'Option \'auto_offset_reset\' only accept %s, not %s' % diff --git a/swh/journal/tests/test_client.py b/swh/journal/tests/test_client.py --- a/swh/journal/tests/test_client.py +++ b/swh/journal/tests/test_client.py @@ -15,7 +15,7 @@ from swh.journal.serializers import key_to_kafka, value_to_kafka -def test_client( +def test_client_objects( kafka_prefix: str, kafka_server: Tuple[Popen, int]): (_, port) = kafka_server @@ -48,3 +48,53 @@ client.process(worker_fn) worker_fn.assert_called_once_with({'revision': [rev.to_dict()]}) + + +def test_client_metadata( + kafka_prefix: str, + kafka_server: Tuple[Popen, int]): + (_, port) = kafka_server + kafka_prefix += '.swh.journal.metadata' + + producer = Producer({ + 'bootstrap.servers': 'localhost:{}'.format(port), + 'client.id': 'test producer', + 'enable.idempotence': 'true', + }) + + tool = { + b'name': b'some-tool', + b'version': b'some-version', + b'configuration': {b'debian-package': b'some-package'}, + } + key = { + b'origin': b'http://example.org/', + b'tool': tool, + } + value = { + '@context': 'https://doi.org/10.5063/schema/codemeta-2.0', + 'type': 'SoftwareSourceCode', + 'author': ['Someone'], + } + + # Fill Kafka + producer.produce( + topic=kafka_prefix + '.origin_intrinsic_metadata', + key=key_to_kafka(key), + value=value_to_kafka(value), + ) + producer.flush() + + config = { + 'brokers': 'localhost:%d' % kafka_server[1], + 'group_id': 'replayer', + 'prefix': kafka_prefix, + 'max_messages': 1, + 'object_types': ['origin_intrinsic_metadata'], + } + client = JournalClient(**config) + + worker_fn = MagicMock() + client.process(worker_fn) + + worker_fn.assert_called_once_with({'origin_intrinsic_metadata': [value]})