diff --git a/swh/indexer/journal_client.py b/swh/indexer/journal_client.py index 90aff7b..642ee1c 100644 --- a/swh/indexer/journal_client.py +++ b/swh/indexer/journal_client.py @@ -1,37 +1,43 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging from swh.core.utils import grouper from swh.scheduler.utils import create_task_dict MAX_ORIGINS_PER_TASK = 100 def process_journal_objects(messages, *, scheduler, task_names): """Worker function for `JournalClient.process(worker_fn)`, after currification of `scheduler` and `task_names`.""" assert set(messages) == {'origin_visit'}, set(messages) process_origin_visits(messages['origin_visit'], scheduler, task_names) def process_origin_visits(visits, scheduler, task_names): task_dicts = [] logging.debug('processing origin visits %r', visits) if task_names.get('origin_metadata'): visits = [visit for visit in visits if visit['status'] == 'full'] visit_batches = grouper(visits, MAX_ORIGINS_PER_TASK) for visit_batch in visit_batches: + visit_urls = [] + for visit in visit_batch: + if isinstance(visit['origin'], str): + visit_urls.append(visit['origin']) + else: + visit_urls.append(visit['origin']['url']) task_dicts.append(create_task_dict( task_names['origin_metadata'], 'oneshot', - [visit['origin']['url'] for visit in visit_batch], + visit_urls, policy_update='update-dups', )) if task_dicts: scheduler.create_tasks(task_dicts) diff --git a/swh/indexer/tests/test_journal_client.py b/swh/indexer/tests/test_journal_client.py index f0b9acf..23f3cb5 100644 --- a/swh/indexer/tests/test_journal_client.py +++ b/swh/indexer/tests/test_journal_client.py @@ -1,132 +1,151 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import unittest from unittest.mock import Mock, patch from swh.indexer.journal_client import process_journal_objects class JournalClientTest(unittest.TestCase): def testOneOriginVisit(self): + mock_scheduler = Mock() + messages = { + 'origin_visit': [ + { + 'status': 'full', + 'origin': 'file:///dev/zero', + }, + ] + } + process_journal_objects( + messages, scheduler=mock_scheduler, + task_names={'origin_metadata': 'task-name'}) + self.assertTrue(mock_scheduler.create_tasks.called) + call_args = mock_scheduler.create_tasks.call_args + (args, kwargs) = call_args + self.assertEqual(kwargs, {}) + del args[0][0]['next_run'] + self.assertEqual(args, ([ + { + 'arguments': { + 'kwargs': {'policy_update': 'update-dups'}, + 'args': (['file:///dev/zero'],) + }, + 'policy': 'oneshot', + 'type': 'task-name' + }, + ],)) + + def testOriginVisitLegacy(self): mock_scheduler = Mock() messages = { 'origin_visit': [ { 'status': 'full', 'origin': { 'url': 'file:///dev/zero', } }, ] } process_journal_objects( messages, scheduler=mock_scheduler, task_names={'origin_metadata': 'task-name'}) self.assertTrue(mock_scheduler.create_tasks.called) call_args = mock_scheduler.create_tasks.call_args (args, kwargs) = call_args self.assertEqual(kwargs, {}) del args[0][0]['next_run'] self.assertEqual(args, ([ { 'arguments': { 'kwargs': {'policy_update': 'update-dups'}, 'args': (['file:///dev/zero'],) }, 'policy': 'oneshot', 'type': 'task-name' }, ],)) def testOneOriginVisitBatch(self): mock_scheduler = Mock() messages = { 'origin_visit': [ { 'status': 'full', - 'origin': { - 'url': 'file:///dev/zero', - } + 'origin': 'file:///dev/zero', }, { 'status': 'full', - 'origin': { - 'url': 'file:///tmp/foobar', - } + 'origin': 'file:///tmp/foobar', }, ] } process_journal_objects( messages, scheduler=mock_scheduler, task_names={'origin_metadata': 'task-name'}) self.assertTrue(mock_scheduler.create_tasks.called) call_args = mock_scheduler.create_tasks.call_args (args, kwargs) = call_args self.assertEqual(kwargs, {}) del args[0][0]['next_run'] self.assertEqual(args, ([ { 'arguments': { 'kwargs': {'policy_update': 'update-dups'}, 'args': (['file:///dev/zero', 'file:///tmp/foobar'],) }, 'policy': 'oneshot', 'type': 'task-name' }, ],)) @patch('swh.indexer.journal_client.MAX_ORIGINS_PER_TASK', 2) def testOriginVisitBatches(self): mock_scheduler = Mock() messages = { 'origin_visit': [ { 'status': 'full', - 'origin': { - 'url': 'file:///dev/zero', - } + 'origin': 'file:///dev/zero', }, { 'status': 'full', - 'origin': { - 'url': 'file:///tmp/foobar', - } + 'origin': 'file:///tmp/foobar', }, { 'status': 'full', - 'origin': { - 'url': 'file:///tmp/spamegg', - } + 'origin': 'file:///tmp/spamegg', }, ] } process_journal_objects( messages, scheduler=mock_scheduler, task_names={'origin_metadata': 'task-name'}) self.assertTrue(mock_scheduler.create_tasks.called) call_args = mock_scheduler.create_tasks.call_args (args, kwargs) = call_args self.assertEqual(kwargs, {}) del args[0][0]['next_run'] del args[0][1]['next_run'] self.assertEqual(args, ([ { 'arguments': { 'kwargs': {'policy_update': 'update-dups'}, 'args': (['file:///dev/zero', 'file:///tmp/foobar'],) }, 'policy': 'oneshot', 'type': 'task-name' }, { 'arguments': { 'kwargs': {'policy_update': 'update-dups'}, 'args': (['file:///tmp/spamegg'],) }, 'policy': 'oneshot', 'type': 'task-name' }, ],))