Page MenuHomeSoftware Heritage

D1196.id3923.diff
No OneTemporary

D1196.id3923.diff

diff --git a/requirements-swh.txt b/requirements-swh.txt
--- a/requirements-swh.txt
+++ b/requirements-swh.txt
@@ -1 +1,2 @@
swh.core >= 0.0.51
+swh.storage >= 0.0.129
diff --git a/swh/scheduler/cli.py b/swh/scheduler/cli.py
--- a/swh/scheduler/cli.py
+++ b/swh/scheduler/cli.py
@@ -14,10 +14,13 @@
import datetime
from swh.core import utils, config
+from swh.storage import get_storage
+from swh.storage.algos.origin import iter_origins
+
from . import compute_nb_tasks_from
from .backend_es import SWHElasticSearchClient
from . import get_scheduler, DEFAULT_CONFIG
-from .cli_utils import parse_options
+from .cli_utils import parse_options, schedule_origin_batches
locale.setlocale(locale.LC_ALL, '')
@@ -288,6 +291,10 @@
def schedule_task(ctx, type, options, policy, priority, next_run):
"""Schedule one task from arguments.
+ The first argument is the name of the task type, further ones are
+ positional and keyword argument(s) of the task, in YAML format.
+ Keyword args are of the form key=value.
+
Usage sample:
swh-scheduler --database 'service=swh-scheduler' \
@@ -327,6 +334,54 @@
click.echo('\n'.join(output))
+@task.command('schedule_origins')
+@click.argument('type', nargs=1, required=True)
+@click.argument('options', nargs=-1)
+@click.option('--batch-size', '-b', 'origin_batch_size',
+ default=10, show_default=True, type=int,
+ help="Number of origins per task")
+@click.option('--min-id',
+ default=0, show_default=True, type=int,
+ help="Only schedule tasks for origins whose ID is greater")
+@click.option('--max-id',
+ default=None, type=int,
+ help="Only schedule tasks for origins whose ID is lower")
+@click.option('--storage-url', '-g',
+ help="URL of the (graph) storage API")
+@click.option('--dry-run/--no-dry-run', is_flag=True,
+ default=False,
+ help='List only what would be scheduled.')
+@click.pass_context
+def schedule_origin_metadata_index(
+ ctx, type, options, storage_url, origin_batch_size,
+ min_id, max_id, dry_run):
+ """Schedules tasks for origins that are already known.
+
+ The first argument is the name of the task type, further ones are
+ keyword argument(s) of the task in the form key=value, where value is
+ in YAML format.
+
+ Usage sample:
+
+ swh-scheduler --database 'service=swh-scheduler' \
+ task schedule_origins indexer_origin_metadata
+ """
+ scheduler = ctx.obj['scheduler']
+ storage = get_storage('remote', {'url': storage_url})
+ if dry_run:
+ scheduler = None
+
+ (args, kw) = parse_options(options)
+ if args:
+ raise click.ClickException('Only keywords arguments are allowed.')
+
+ origins = iter_origins(storage, origin_from=min_id, origin_to=max_id)
+ origin_ids = (origin['id'] for origin in origins)
+
+ schedule_origin_batches(
+ scheduler, type, origin_ids, origin_batch_size, kw)
+
+
@task.command('list-pending')
@click.argument('task-types', required=True, nargs=-1)
@click.option('--limit', '-l', required=False, type=click.INT,
diff --git a/swh/scheduler/cli_utils.py b/swh/scheduler/cli_utils.py
--- a/swh/scheduler/cli_utils.py
+++ b/swh/scheduler/cli_utils.py
@@ -3,9 +3,51 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
+import itertools
+
import click
import yaml
+from .utils import create_task_dict
+
+TASK_BATCH_SIZE = 1000 # Number of tasks per query to the scheduler
+
+
+def schedule_origin_batches(
+ scheduler, task_type, origins, origin_batch_size, kwargs):
+ nb_origins = 0
+ nb_tasks = 0
+
+ while True:
+ task_batch = []
+ for _ in range(TASK_BATCH_SIZE):
+ # Group origins
+ origin_batch = []
+ for origin in itertools.islice(origins, origin_batch_size):
+ origin_batch.append(origin)
+ nb_origins += len(origin_batch)
+ if not origin_batch:
+ break
+
+ # Create a task for these origins
+ args = [origin_batch]
+ task_dict = create_task_dict(task_type, 'oneshot', *args, **kwargs)
+ task_batch.append(task_dict)
+
+ # Schedule a batch of tasks
+ if not task_batch:
+ break
+ nb_tasks += len(task_batch)
+ if scheduler:
+ scheduler.create_tasks(task_batch)
+ click.echo('Scheduled %d tasks (%d origins).' % (nb_tasks, nb_origins))
+
+ # Print final status.
+ if nb_tasks:
+ click.echo('Done.')
+ else:
+ click.echo('Nothing to do (no origin metadata matched the criteria).')
+
def parse_argument(option):
try:
diff --git a/swh/scheduler/tests/test_cli.py b/swh/scheduler/tests/test_cli.py
--- a/swh/scheduler/tests/test_cli.py
+++ b/swh/scheduler/tests/test_cli.py
@@ -11,6 +11,8 @@
from click.testing import CliRunner
import pytest
+from swh.storage.in_memory import Storage
+
from swh.scheduler.cli import cli
from swh.scheduler.utils import create_task_dict
@@ -29,7 +31,8 @@
config_fd.write(CLI_CONFIG)
config_fd.seek(0)
get_scheduler_mock.return_value = scheduler
- result = runner.invoke(cli, ['-C' + config_fd.name] + args)
+ args = ['-C' + config_fd.name, '-l', 'WARNING'] + args
+ result = runner.invoke(cli, args)
if not catch_exceptions and result.exception:
print(result.output)
raise result.exception
@@ -51,7 +54,6 @@
csv_fd.name
])
expected = r'''
-\[INFO\] swh.core.config -- Loading config file .*
Created 2 tasks
Task 1
@@ -91,7 +93,6 @@
csv_fd.name
])
expected = r'''
-\[INFO\] swh.core.config -- Loading config file .*
Created 1 tasks
Task 1
@@ -116,7 +117,6 @@
'swh-test-ping', 'arg1', 'arg2', 'key=value',
])
expected = r'''
-\[INFO\] swh.core.config -- Loading config file .*
Created 1 tasks
Task 1
@@ -141,7 +141,6 @@
])
expected = r'''
-\[INFO\] swh.core.config -- Loading config file .*
Found 0 swh-test-ping tasks
'''.lstrip()
@@ -160,7 +159,6 @@
])
expected = r'''
-\[INFO\] swh.core.config -- Loading config file .*
Found 1 swh-test-ping tasks
Task 1
@@ -183,7 +181,6 @@
])
expected = r'''
-\[INFO\] swh.core.config -- Loading config file .*
Found 0 swh-test-ping tasks
'''.lstrip()
@@ -200,7 +197,6 @@
])
expected = r'''
-\[INFO\] swh.core.config -- Loading config file .*
Found 0 swh-test-ping tasks
'''.lstrip()
@@ -219,7 +215,6 @@
])
expected = r'''
-\[INFO\] swh.core.config -- Loading config file .*
Found 1 swh-test-ping tasks
Task 2
@@ -250,7 +245,6 @@
])
expected = r'''
-\[INFO\] swh.core.config -- Loading config file .*
Found 2 swh-test-ping tasks
Task 1
@@ -298,7 +292,6 @@
])
expected = r'''
-\[INFO\] swh.core.config -- Loading config file .*
Found 1 swh-test-ping tasks
Task 2
@@ -328,7 +321,6 @@
])
expected = r'''
-\[INFO\] swh.core.config -- Loading config file .*
Found 2 tasks
Task 1
@@ -369,7 +361,6 @@
])
expected = r'''
-\[INFO\] swh.core.config -- Loading config file .*
Found 1 tasks
Task 2
@@ -399,7 +390,6 @@
])
expected = r'''
-\[INFO\] swh.core.config -- Loading config file .*
Found 2 tasks
Task 2
@@ -440,7 +430,6 @@
])
expected = r'''
-\[INFO\] swh.core.config -- Loading config file .*
Found 2 tasks
Task 1
@@ -481,7 +470,6 @@
])
expected = r'''
-\[INFO\] swh.core.config -- Loading config file .*
Found 2 tasks
Task 1
@@ -525,7 +513,6 @@
])
expected = r'''
-\[INFO\] swh.core.config -- Loading config file .*
Found 1 tasks
Task 2
@@ -558,7 +545,6 @@
])
expected = r'''
-\[INFO\] swh.core.config -- Loading config file .*
Found 1 tasks
Task 1
@@ -575,3 +561,110 @@
'''.lstrip()
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
+
+
+def _fill_storage_with_origins(storage, nb_origins):
+ storage.origin_add([
+ {
+ 'type': 'type{}'.format(i),
+ 'url': 'http://example.com/{}'.format(i),
+ }
+ for i in range(nb_origins)
+ ])
+
+
+@pytest.fixture
+def storage():
+ """An instance of swh.storage.in_memory.Storage that gets injected
+ into the CLI functions."""
+ storage = Storage()
+ with patch('swh.scheduler.cli.get_storage') as get_storage_mock:
+ get_storage_mock.return_value = storage
+ yield storage
+
+
+@patch('swh.scheduler.cli_utils.TASK_BATCH_SIZE', 3)
+def test_task_schedule_origins_dry_run(
+ swh_scheduler, storage):
+ """Tests the scheduling when origin_batch_size*task_batch_size is a
+ divisor of nb_origins."""
+ _fill_storage_with_origins(storage, 90)
+
+ result = invoke(swh_scheduler, False, [
+ 'task', 'schedule_origins', '--dry-run', 'swh-test-ping',
+ ])
+
+ # Check the output
+ expected = r'''
+Scheduled 3 tasks \(30 origins\).
+Scheduled 6 tasks \(60 origins\).
+Scheduled 9 tasks \(90 origins\).
+Done.
+'''.lstrip()
+ assert result.exit_code == 0, result.output
+ assert re.fullmatch(expected, result.output, re.MULTILINE), \
+ repr(result.output)
+
+ # Check scheduled tasks
+ tasks = swh_scheduler.search_tasks()
+ assert len(tasks) == 0
+
+
+@patch('swh.scheduler.cli_utils.TASK_BATCH_SIZE', 3)
+def test_task_schedule_origins(swh_scheduler, storage):
+ """Tests the scheduling when neither origin_batch_size or
+ task_batch_size is a divisor of nb_origins."""
+ _fill_storage_with_origins(storage, 70)
+
+ result = invoke(swh_scheduler, False, [
+ 'task', 'schedule_origins', 'swh-test-ping',
+ '--batch-size', '20',
+ ])
+
+ # Check the output
+ expected = r'''
+Scheduled 3 tasks \(60 origins\).
+Scheduled 4 tasks \(70 origins\).
+Done.
+'''.lstrip()
+ assert result.exit_code == 0, result.output
+ assert re.fullmatch(expected, result.output, re.MULTILINE), \
+ repr(result.output)
+
+ # Check scheduled tasks
+ tasks = swh_scheduler.search_tasks()
+ assert len(tasks) == 4
+ assert tasks[0]['arguments']['args'] == [list(range(1, 21))]
+ assert tasks[1]['arguments']['args'] == [list(range(21, 41))]
+ assert tasks[2]['arguments']['args'] == [list(range(41, 61))]
+ assert tasks[3]['arguments']['args'] == [list(range(61, 71))]
+ assert all(task['arguments']['kwargs'] == {} for task in tasks)
+
+
+def test_task_schedule_origins_kwargs(swh_scheduler, storage):
+ """Tests support of extra keyword-arguments."""
+ _fill_storage_with_origins(storage, 30)
+
+ result = invoke(swh_scheduler, False, [
+ 'task', 'schedule_origins', 'swh-test-ping',
+ '--batch-size', '20',
+ 'key1="value1"', 'key2="value2"',
+ ])
+
+ # Check the output
+ expected = r'''
+Scheduled 2 tasks \(30 origins\).
+Done.
+'''.lstrip()
+ assert result.exit_code == 0, result.output
+ assert re.fullmatch(expected, result.output, re.MULTILINE), \
+ repr(result.output)
+
+ # Check scheduled tasks
+ tasks = swh_scheduler.search_tasks()
+ assert len(tasks) == 2
+ assert tasks[0]['arguments']['args'] == [list(range(1, 21))]
+ assert tasks[1]['arguments']['args'] == [list(range(21, 31))]
+ assert all(task['arguments']['kwargs'] ==
+ {'key1': 'value1', 'key2': 'value2'}
+ for task in tasks)

File Metadata

Mime Type
text/plain
Expires
Nov 5 2024, 4:29 PM (11 w, 16 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3223562

Event Timeline