Page MenuHomeSoftware Heritage

D1196.id3885.diff
No OneTemporary

D1196.id3885.diff

diff --git a/requirements-swh.txt b/requirements-swh.txt
--- a/requirements-swh.txt
+++ b/requirements-swh.txt
@@ -1 +1,2 @@
swh.core >= 0.0.51
+swh.storage >= 0.0.129
diff --git a/swh/scheduler/cli.py b/swh/scheduler/cli.py
--- a/swh/scheduler/cli.py
+++ b/swh/scheduler/cli.py
@@ -14,10 +14,13 @@
import datetime
from swh.core import utils, config
+from swh.storage import get_storage
+from swh.storage.algos.origin import iter_origins
+
from . import compute_nb_tasks_from
from .backend_es import SWHElasticSearchClient
from . import get_scheduler, DEFAULT_CONFIG
-from .cli_utils import parse_options
+from .cli_utils import parse_options, schedule_origin_batches
locale.setlocale(locale.LC_ALL, '')
@@ -288,6 +291,10 @@
def schedule_task(ctx, type, options, policy, priority, next_run):
"""Schedule one task from arguments.
+ The first argument is the name of the task type, further ones are
+ positional and keyword argument(s) of the task, in YAML format.
+ Keyword args are of the form key=value.
+
Usage sample:
swh-scheduler --database 'service=swh-scheduler' \
@@ -327,6 +334,54 @@
click.echo('\n'.join(output))
+@task.command('schedule_origins')
+@click.argument('type', nargs=1, required=True)
+@click.argument('options', nargs=-1)
+@click.option('--batch-size', '-b', 'origin_batch_size',
+ default=10, show_default=True, type=int,
+ help="Number of origins per task")
+@click.option('--min-id',
+ default=0, show_default=True, type=int,
+ help="Only schedule tasks for origins whose ID is greater")
+@click.option('--max-id',
+ default=None, type=int,
+ help="Only schedule tasks for origins whose ID is lower")
+@click.option('--storage-url', '-g',
+ help="URL of the (graph) storage API")
+@click.option('--dry-run/--no-dry-run', is_flag=True,
+ default=False,
+ help='List only what would be scheduled.')
+@click.pass_context
+def schedule_origin_metadata_index(
+ ctx, type, options, storage_url, origin_batch_size,
+ min_id, max_id, dry_run):
+ """Schedules tasks for origins that are already knows.
+
+ The first argument is the name of the task type, further ones are
+ keyword argument(s) of the task in the form key=value, where value is
+ in YAML format.
+
+ Usage sample:
+
+ swh-scheduler --database 'service=swh-scheduler' \
+ task schedule_origins indexer_origin_metadata
+ """
+ scheduler = ctx.obj['scheduler']
+ storage = get_storage('remote', {'url': storage_url})
+ if dry_run:
+ scheduler = None
+
+ (args, kw) = parse_options(options)
+ if args:
+ raise click.ClickException('Only keywords arguments are allowed.')
+
+ origins = iter_origins(storage, origin_from=min_id, origin_to=max_id)
+ origin_ids = (origin['id'] for origin in origins)
+
+ schedule_origin_batches(
+ scheduler, type, origin_ids, origin_batch_size, kw)
+
+
@task.command('list-pending')
@click.argument('task-types', required=True, nargs=-1)
@click.option('--limit', '-l', required=False, type=click.INT,
diff --git a/swh/scheduler/cli_utils.py b/swh/scheduler/cli_utils.py
--- a/swh/scheduler/cli_utils.py
+++ b/swh/scheduler/cli_utils.py
@@ -3,9 +3,51 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
+import itertools
+
import click
import yaml
+from .utils import create_task_dict
+
+TASK_BATCH_SIZE = 1000 # Number of tasks per query to the scheduler
+
+
+def schedule_origin_batches(
+ scheduler, task_type, origins, origin_batch_size, kwargs):
+ nb_origins = 0
+ nb_tasks = 0
+
+ while True:
+ task_batch = []
+ for _ in range(TASK_BATCH_SIZE):
+ # Group origins
+ origin_batch = []
+ for (_, origin) in itertools.islice(origins, origin_batch_size):
+ origin_batch.append(origin)
+ nb_origins += len(origin_batch)
+ if not origin_batch:
+ break
+
+ # Create a task for these origins
+ args = [origin_batch]
+ task_dict = create_task_dict(task_type, 'oneshot', *args, **kwargs)
+ task_batch.append(task_dict)
+
+ # Schedule a batch of tasks
+ if not task_batch:
+ break
+ nb_tasks += len(task_batch)
+ if scheduler:
+ scheduler.create_tasks(task_batch)
+ click.echo('Scheduled %d tasks (%d origins).' % (nb_tasks, nb_origins))
+
+ # Print final status.
+ if nb_tasks:
+ click.echo('Done.')
+ else:
+ click.echo('Nothing to do (no origin metadata matched the criteria).')
+
def parse_options(options):
"""Parses options from a CLI as YAML and turns it into Python
diff --git a/swh/scheduler/tests/test_cli.py b/swh/scheduler/tests/test_cli.py
--- a/swh/scheduler/tests/test_cli.py
+++ b/swh/scheduler/tests/test_cli.py
@@ -9,6 +9,9 @@
from unittest.mock import patch
from click.testing import CliRunner
+import pytest
+
+from swh.storage.in_memory import Storage
from swh.scheduler.cli import cli
from swh.scheduler.utils import create_task_dict
@@ -189,3 +192,107 @@
'''.lstrip()
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
+
+
+def _fill_storage_with_origins(storage, nb_origins):
+ storage.origin_add([
+ {
+ 'type': 'type{}'.format(i),
+ 'url': 'http://example.com/{}'.format(i),
+ }
+ for i in range(nb_origins)
+ ])
+
+
+@pytest.fixture
+def storage():
+ """An instance of swh.storage.in_memory.Storage that gets injected
+ into the CLI functions."""
+ storage = Storage()
+ with patch('swh.scheduler.cli.get_storage') as get_storage_mock:
+ get_storage_mock.return_value = storage
+ yield storage
+
+
+@patch('swh.scheduler.cli_utils.TASK_BATCH_SIZE', 3)
+def test_task_schedule_origins_dry_run(
+ swh_scheduler, storage):
+ """Tests the scheduling when origin_batch_size*task_batch_size is a
+ divisor of nb_origins."""
+ _fill_storage_with_origins(storage, 90)
+
+ result = invoke(swh_scheduler, False, [
+ 'task', 'schedule_origins', '--dry-run', 'swh-test-ping',
+ ])
+
+ # Check the output
+ expected = r'''^\[INFO\] swh.core.config -- Loading config file .*
+Scheduled 3 tasks \(30 origins\).
+Scheduled 6 tasks \(60 origins\).
+Scheduled 9 tasks \(90 origins\).
+Done.
+'''
+ assert result.exit_code == 0, result.output
+ assert re.match(expected, result.output, re.MULTILINE), repr(result.output)
+
+ # Check scheduled tasks
+ tasks = swh_scheduler.search_tasks()
+ assert len(tasks) == 0
+
+
+@patch('swh.scheduler.cli_utils.TASK_BATCH_SIZE', 3)
+def test_task_schedule_origins(swh_scheduler, storage):
+ """Tests the scheduling when neither origin_batch_size or
+ task_batch_size is a divisor of nb_origins."""
+ _fill_storage_with_origins(storage, 70)
+
+ result = invoke(swh_scheduler, False, [
+ 'task', 'schedule_origins', 'swh-test-ping',
+ '--batch-size', '20',
+ ])
+
+ # Check the output
+ expected = r'''^\[INFO\] swh.core.config -- Loading config file .*
+Scheduled 3 tasks \(60 origins\).
+Scheduled 4 tasks \(70 origins\).
+Done.
+'''
+ assert result.exit_code == 0, result.output
+ assert re.match(expected, result.output, re.MULTILINE), repr(result.output)
+
+ # Check scheduled tasks
+ tasks = swh_scheduler.search_tasks()
+ assert len(tasks) == 4
+ assert tasks[0]['arguments']['args'] == [list(range(1, 21))]
+ assert tasks[1]['arguments']['args'] == [list(range(21, 41))]
+ assert tasks[2]['arguments']['args'] == [list(range(41, 61))]
+ assert tasks[3]['arguments']['args'] == [list(range(61, 71))]
+ assert all(task['arguments']['kwargs'] == {} for task in tasks)
+
+
+def test_task_schedule_origins_kwargs(swh_scheduler, storage):
+ """Tests support of extra keyword-arguments."""
+ _fill_storage_with_origins(storage, 30)
+
+ result = invoke(swh_scheduler, False, [
+ 'task', 'schedule_origins', 'swh-test-ping',
+ '--batch-size', '20',
+ 'key1="value1"', 'key2="value2"',
+ ])
+
+ # Check the output
+ expected = r'''^\[INFO\] swh.core.config -- Loading config file .*
+Scheduled 2 tasks \(30 origins\).
+Done.
+'''
+ assert result.exit_code == 0, result.output
+ assert re.match(expected, result.output, re.MULTILINE), repr(result.output)
+
+ # Check scheduled tasks
+ tasks = swh_scheduler.search_tasks()
+ assert len(tasks) == 2
+ assert tasks[0]['arguments']['args'] == [list(range(1, 21))]
+ assert tasks[1]['arguments']['args'] == [list(range(21, 31))]
+ assert all(task['arguments']['kwargs'] ==
+ {'key1': 'value1', 'key2': 'value2'}
+ for task in tasks)

File Metadata

Mime Type
text/plain
Expires
Nov 5 2024, 6:37 PM (12 w, 4 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3225797

Event Timeline