diff --git a/debian/control b/debian/control --- a/debian/control +++ b/debian/control @@ -9,12 +9,13 @@ python3-nose, python3-setuptools, python3-swh.core, - python3-swh.storage, + python3-swh.storage (>= 0.0.108~), python3-vcversioner Standards-Version: 3.9.6 Homepage: https://forge.softwareheritage.org/diffusion/DJNL/ Package: python3-swh.journal Architecture: all -Depends: python3-kafka (>= 1.3), ${misc:Depends}, ${python3:Depends} +Depends: python3-kafka (>= 1.3), python3-swh.storage (>= 0.0.108~), + ${misc:Depends}, ${python3:Depends} Description: Software Heritage Journal utilities diff --git a/requirements-swh.txt b/requirements-swh.txt --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,2 +1,2 @@ swh.core -swh.storage +swh.storage >= 0.0.108 diff --git a/swh/journal/publisher.py b/swh/journal/publisher.py --- a/swh/journal/publisher.py +++ b/swh/journal/publisher.py @@ -10,11 +10,22 @@ from swh.core.config import SWHConfig from swh.storage import get_storage +from swh.storage.algos import snapshot from .serializers import kafka_to_key, key_to_kafka class SWHJournalPublisher(SWHConfig): + """The journal publisher is a layer in charge of: + + - consuming messages from topics (1 topic per object_type) + - reify the object ids read from those topics (using the storage) + - producing those reified objects to output topics (1 topic per + object type) + + The main entry point for this class is the 'poll' method. + + """ DEFAULT_CONFIG = { 'brokers': ('list[str]', ['getty.internal.softwareheritage.org']), @@ -49,7 +60,10 @@ self.max_messages = self.config['max_messages'] def _prepare_journal(self, config): + """Prepare the consumer and subscriber instances for the publisher to + actually be able to discuss with the journal. + """ # yes, the temporary topics contain values that are actually _keys_ self.consumer = KafkaConsumer( bootstrap_servers=config['brokers'], @@ -71,17 +85,33 @@ ) def _prepare_storage(self, config): + """Prepare the storage instance needed for the publisher to be able to + discuss with the storage to retrieve the objects. + + """ self.storage = get_storage(**config['storage']) - def poll(self): - """Process a batch of messages""" - num = 0 + def poll(self, max_messages=None): + """Process a batch of messages from the consumer's topics. Use the + storage to reify those ids. Produces back those reified + objects to the production topics. + + This method polls a given amount of message then stops. + The number of messages to consume is either provided or + configured as fallback. + + The following method is expected to be called from within a + loop. + + """ messages = defaultdict(list) + if max_messages is None: + max_messages = self.max_messages for num, message in enumerate(self.consumer): object_type = message.topic.split('.')[-1] messages[object_type].append(message.value) - if num >= self.max_messages: + if num >= max_messages: break new_objects = self.process_objects(messages) @@ -89,10 +119,25 @@ self.consumer.commit() def process_objects(self, messages): + """Given a dict of messages {object type: [object id]}, reify those + ids to swh object from the storage and returns a + corresponding dict. + + Args: + messages (dict): Dict of {object_type: [id-as-bytes]} + + Returns: + Dict of {object_type: [tuple]}. + + object_type (str): content, revision, release + tuple (bytes, dict): object id as bytes, object as swh dict. + + """ processors = { 'content': self.process_contents, 'revision': self.process_revisions, 'release': self.process_releases, + 'snapshot': self.process_snapshots, } return { @@ -101,6 +146,15 @@ } def produce_messages(self, messages): + """Produce new swh object to the producer topic. + + Args: + messages ([dict]): Dict of {object_type: [tuple]}. + + object_type (str): content, revision, release + tuple (bytes, dict): object id as bytes, object as swh dict. + + """ for object_type, objects in messages.items(): topic = '%s.%s' % (self.config['final_prefix'], object_type) for key, object in objects: @@ -121,6 +175,15 @@ metadata = self.storage.release_get((r[b'id'] for r in release_objs)) return [(release['id'], release) for release in metadata] + def process_snapshots(self, snapshot_objs): + metadata = [] + for snap in snapshot_objs: + full_obj = snapshot.snapshot_get_all_branches( + self.storage, snap[b'id']) + metadata.append((full_obj['id'], full_obj)) + + return metadata + if __name__ == '__main__': logging.basicConfig( diff --git a/swh/journal/tests/test_publisher.py b/swh/journal/tests/test_publisher.py --- a/swh/journal/tests/test_publisher.py +++ b/swh/journal/tests/test_publisher.py @@ -15,6 +15,7 @@ 'content': 'sha1', 'revision': 'id', 'release': 'id', + 'snapshot': 'id', } def __init__(self, state): @@ -66,6 +67,11 @@ def release_get(self, releases): return self.get('release', releases) + def snapshot_get(self, snapshot_id): + snaps = self.get('snapshot', [snapshot_id]) + if snaps: + return snaps[0] + CONTENTS = [ { @@ -95,6 +101,23 @@ }, ] +SNAPSHOTS = [ + { + 'id': hash_to_bytes('c932c7649c6dfa4b82327d121215116909eb3bea'), + 'branches': { + 'HEAD': { + 'target_type': 'alias', + 'target': 'refs/heads/master', + }, + 'refs/heads/master': { + 'target': hash_to_bytes( + 'd81cc0710eb6cf9efd5b920a8453e1e07157b6cd'), + 'target_type': 'revision', + } + } + }, +] + class JournalPublisherTest(SWHJournalPublisher): def parse_config_file(self): @@ -113,6 +136,7 @@ 'content': CONTENTS, 'revision': REVISIONS, 'release': RELEASES, + 'snapshot': SNAPSHOTS, }) def _prepare_journal(self, config): @@ -128,6 +152,7 @@ self.contents = [{b'sha1': c['sha1']} for c in CONTENTS] self.revisions = [{b'id': c['id']} for c in REVISIONS] self.releases = [{b'id': c['id']} for c in RELEASES] + self.snapshots = [{b'id': s['id']} for s in SNAPSHOTS] def test_process_contents(self): actual_contents = self.publisher.process_contents(self.contents) @@ -144,11 +169,17 @@ expected_releases = [(c['id'], c) for c in RELEASES] self.assertEqual(actual_releases, expected_releases) + def test_process_snapshots(self): + actual_snapshots = self.publisher.process_snapshots(self.snapshots) + expected_snapshots = [(s['id'], s) for s in SNAPSHOTS] + self.assertEqual(actual_snapshots, expected_snapshots) + def test_process_objects(self): messages = { 'content': self.contents, 'revision': self.revisions, 'release': self.releases, + 'snapshot': self.snapshots, } actual_objects = self.publisher.process_objects(messages) @@ -156,11 +187,13 @@ expected_contents = [(c['sha1'], c) for c in CONTENTS] expected_revisions = [(c['id'], c) for c in REVISIONS] expected_releases = [(c['id'], c) for c in RELEASES] + expected_snapshots = [(s['id'], s) for s in SNAPSHOTS] expected_objects = { 'content': expected_contents, 'revision': expected_revisions, 'release': expected_releases, + 'snapshot': expected_snapshots, } self.assertEqual(actual_objects, expected_objects)