diff --git a/swh/indexer/journal_client.py b/swh/indexer/journal_client.py index aec8ad7..e03975c 100644 --- a/swh/indexer/journal_client.py +++ b/swh/indexer/journal_client.py @@ -1,34 +1,37 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging +from swh.core.utils import grouper from swh.scheduler.utils import create_task_dict +MAX_ORIGINS_PER_TASK = 100 + + def process_journal_objects(messages, *, scheduler, task_names): """Worker function for `JournalClient.process(worker_fn)`, after currification of `scheduler` and `task_names`.""" assert set(messages) == {'origin_visit'}, set(messages) - for origin_visit in messages['origin_visit']: - process_origin_visit(origin_visit, scheduler, task_names) + process_origin_visits(messages['origin_visit'], scheduler, task_names) -def process_origin_visit(origin_visit, scheduler, task_names): +def process_origin_visits(visits, scheduler, task_names): task_dicts = [] - logging.debug('processing origin visit %r', origin_visit) - if origin_visit[b'status'] == b'full': - if task_names.get('origin_metadata'): + logging.debug('processing origin visits %r', visits) + if task_names.get('origin_metadata'): + visits = [visit for visit in visits if visit[b'status'] == b'full'] + visit_batches = grouper(visits, MAX_ORIGINS_PER_TASK) + for visit_batch in visit_batches: task_dicts.append(create_task_dict( task_names['origin_metadata'], 'oneshot', - [origin_visit[b'origin'][b'url']], + [visit[b'origin'][b'url'] for visit in visit_batch], policy_update='update-dups', )) - else: - logging.debug('status is not "full", ignoring.') if task_dicts: scheduler.create_tasks(task_dicts) diff --git a/swh/indexer/tests/test_journal_client.py b/swh/indexer/tests/test_journal_client.py index 889b26a..fb6cc1b 100644 --- a/swh/indexer/tests/test_journal_client.py +++ b/swh/indexer/tests/test_journal_client.py @@ -1,42 +1,132 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import unittest -from unittest.mock import Mock +from unittest.mock import Mock, patch from swh.indexer.journal_client import process_journal_objects class JournalClientTest(unittest.TestCase): - def testOriginVisit(self): + def testOneOriginVisit(self): mock_scheduler = Mock() messages = { 'origin_visit': [ { b'status': b'full', b'origin': { b'url': 'file:///dev/zero', } - } + }, ] } process_journal_objects( messages, scheduler=mock_scheduler, task_names={'origin_metadata': 'task-name'}) self.assertTrue(mock_scheduler.create_tasks.called) call_args = mock_scheduler.create_tasks.call_args (args, kwargs) = call_args self.assertEqual(kwargs, {}) del args[0][0]['next_run'] self.assertEqual(args, ([ { 'arguments': { 'kwargs': {'policy_update': 'update-dups'}, 'args': (['file:///dev/zero'],) }, 'policy': 'oneshot', 'type': 'task-name' - } + }, + ],)) + + def testOneOriginVisitBatch(self): + mock_scheduler = Mock() + messages = { + 'origin_visit': [ + { + b'status': b'full', + b'origin': { + b'url': 'file:///dev/zero', + } + }, + { + b'status': b'full', + b'origin': { + b'url': 'file:///tmp/foobar', + } + }, + ] + } + process_journal_objects( + messages, scheduler=mock_scheduler, + task_names={'origin_metadata': 'task-name'}) + self.assertTrue(mock_scheduler.create_tasks.called) + call_args = mock_scheduler.create_tasks.call_args + (args, kwargs) = call_args + self.assertEqual(kwargs, {}) + del args[0][0]['next_run'] + self.assertEqual(args, ([ + { + 'arguments': { + 'kwargs': {'policy_update': 'update-dups'}, + 'args': (['file:///dev/zero', 'file:///tmp/foobar'],) + }, + 'policy': 'oneshot', + 'type': 'task-name' + }, + ],)) + + @patch('swh.indexer.journal_client.MAX_ORIGINS_PER_TASK', 2) + def testOriginVisitBatches(self): + mock_scheduler = Mock() + messages = { + 'origin_visit': [ + { + b'status': b'full', + b'origin': { + b'url': 'file:///dev/zero', + } + }, + { + b'status': b'full', + b'origin': { + b'url': 'file:///tmp/foobar', + } + }, + { + b'status': b'full', + b'origin': { + b'url': 'file:///tmp/spamegg', + } + }, + ] + } + process_journal_objects( + messages, scheduler=mock_scheduler, + task_names={'origin_metadata': 'task-name'}) + self.assertTrue(mock_scheduler.create_tasks.called) + call_args = mock_scheduler.create_tasks.call_args + (args, kwargs) = call_args + self.assertEqual(kwargs, {}) + del args[0][0]['next_run'] + del args[0][1]['next_run'] + self.assertEqual(args, ([ + { + 'arguments': { + 'kwargs': {'policy_update': 'update-dups'}, + 'args': (['file:///dev/zero', 'file:///tmp/foobar'],) + }, + 'policy': 'oneshot', + 'type': 'task-name' + }, + { + 'arguments': { + 'kwargs': {'policy_update': 'update-dups'}, + 'args': (['file:///tmp/spamegg'],) + }, + 'policy': 'oneshot', + 'type': 'task-name' + }, ],))