Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F7066632
D1196.id3923.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
11 KB
Subscribers
None
D1196.id3923.diff
View Options
diff --git a/requirements-swh.txt b/requirements-swh.txt
--- a/requirements-swh.txt
+++ b/requirements-swh.txt
@@ -1 +1,2 @@
swh.core >= 0.0.51
+swh.storage >= 0.0.129
diff --git a/swh/scheduler/cli.py b/swh/scheduler/cli.py
--- a/swh/scheduler/cli.py
+++ b/swh/scheduler/cli.py
@@ -14,10 +14,13 @@
import datetime
from swh.core import utils, config
+from swh.storage import get_storage
+from swh.storage.algos.origin import iter_origins
+
from . import compute_nb_tasks_from
from .backend_es import SWHElasticSearchClient
from . import get_scheduler, DEFAULT_CONFIG
-from .cli_utils import parse_options
+from .cli_utils import parse_options, schedule_origin_batches
locale.setlocale(locale.LC_ALL, '')
@@ -288,6 +291,10 @@
def schedule_task(ctx, type, options, policy, priority, next_run):
"""Schedule one task from arguments.
+ The first argument is the name of the task type, further ones are
+ positional and keyword argument(s) of the task, in YAML format.
+ Keyword args are of the form key=value.
+
Usage sample:
swh-scheduler --database 'service=swh-scheduler' \
@@ -327,6 +334,54 @@
click.echo('\n'.join(output))
+@task.command('schedule_origins')
+@click.argument('type', nargs=1, required=True)
+@click.argument('options', nargs=-1)
+@click.option('--batch-size', '-b', 'origin_batch_size',
+ default=10, show_default=True, type=int,
+ help="Number of origins per task")
+@click.option('--min-id',
+ default=0, show_default=True, type=int,
+ help="Only schedule tasks for origins whose ID is greater")
+@click.option('--max-id',
+ default=None, type=int,
+ help="Only schedule tasks for origins whose ID is lower")
+@click.option('--storage-url', '-g',
+ help="URL of the (graph) storage API")
+@click.option('--dry-run/--no-dry-run', is_flag=True,
+ default=False,
+ help='List only what would be scheduled.')
+@click.pass_context
+def schedule_origin_metadata_index(
+ ctx, type, options, storage_url, origin_batch_size,
+ min_id, max_id, dry_run):
+ """Schedules tasks for origins that are already known.
+
+ The first argument is the name of the task type, further ones are
+ keyword argument(s) of the task in the form key=value, where value is
+ in YAML format.
+
+ Usage sample:
+
+ swh-scheduler --database 'service=swh-scheduler' \
+ task schedule_origins indexer_origin_metadata
+ """
+ scheduler = ctx.obj['scheduler']
+ storage = get_storage('remote', {'url': storage_url})
+ if dry_run:
+ scheduler = None
+
+ (args, kw) = parse_options(options)
+ if args:
+ raise click.ClickException('Only keywords arguments are allowed.')
+
+ origins = iter_origins(storage, origin_from=min_id, origin_to=max_id)
+ origin_ids = (origin['id'] for origin in origins)
+
+ schedule_origin_batches(
+ scheduler, type, origin_ids, origin_batch_size, kw)
+
+
@task.command('list-pending')
@click.argument('task-types', required=True, nargs=-1)
@click.option('--limit', '-l', required=False, type=click.INT,
diff --git a/swh/scheduler/cli_utils.py b/swh/scheduler/cli_utils.py
--- a/swh/scheduler/cli_utils.py
+++ b/swh/scheduler/cli_utils.py
@@ -3,9 +3,51 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
+import itertools
+
import click
import yaml
+from .utils import create_task_dict
+
+TASK_BATCH_SIZE = 1000 # Number of tasks per query to the scheduler
+
+
+def schedule_origin_batches(
+ scheduler, task_type, origins, origin_batch_size, kwargs):
+ nb_origins = 0
+ nb_tasks = 0
+
+ while True:
+ task_batch = []
+ for _ in range(TASK_BATCH_SIZE):
+ # Group origins
+ origin_batch = []
+ for origin in itertools.islice(origins, origin_batch_size):
+ origin_batch.append(origin)
+ nb_origins += len(origin_batch)
+ if not origin_batch:
+ break
+
+ # Create a task for these origins
+ args = [origin_batch]
+ task_dict = create_task_dict(task_type, 'oneshot', *args, **kwargs)
+ task_batch.append(task_dict)
+
+ # Schedule a batch of tasks
+ if not task_batch:
+ break
+ nb_tasks += len(task_batch)
+ if scheduler:
+ scheduler.create_tasks(task_batch)
+ click.echo('Scheduled %d tasks (%d origins).' % (nb_tasks, nb_origins))
+
+ # Print final status.
+ if nb_tasks:
+ click.echo('Done.')
+ else:
+ click.echo('Nothing to do (no origin metadata matched the criteria).')
+
def parse_argument(option):
try:
diff --git a/swh/scheduler/tests/test_cli.py b/swh/scheduler/tests/test_cli.py
--- a/swh/scheduler/tests/test_cli.py
+++ b/swh/scheduler/tests/test_cli.py
@@ -11,6 +11,8 @@
from click.testing import CliRunner
import pytest
+from swh.storage.in_memory import Storage
+
from swh.scheduler.cli import cli
from swh.scheduler.utils import create_task_dict
@@ -29,7 +31,8 @@
config_fd.write(CLI_CONFIG)
config_fd.seek(0)
get_scheduler_mock.return_value = scheduler
- result = runner.invoke(cli, ['-C' + config_fd.name] + args)
+ args = ['-C' + config_fd.name, '-l', 'WARNING'] + args
+ result = runner.invoke(cli, args)
if not catch_exceptions and result.exception:
print(result.output)
raise result.exception
@@ -51,7 +54,6 @@
csv_fd.name
])
expected = r'''
-\[INFO\] swh.core.config -- Loading config file .*
Created 2 tasks
Task 1
@@ -91,7 +93,6 @@
csv_fd.name
])
expected = r'''
-\[INFO\] swh.core.config -- Loading config file .*
Created 1 tasks
Task 1
@@ -116,7 +117,6 @@
'swh-test-ping', 'arg1', 'arg2', 'key=value',
])
expected = r'''
-\[INFO\] swh.core.config -- Loading config file .*
Created 1 tasks
Task 1
@@ -141,7 +141,6 @@
])
expected = r'''
-\[INFO\] swh.core.config -- Loading config file .*
Found 0 swh-test-ping tasks
'''.lstrip()
@@ -160,7 +159,6 @@
])
expected = r'''
-\[INFO\] swh.core.config -- Loading config file .*
Found 1 swh-test-ping tasks
Task 1
@@ -183,7 +181,6 @@
])
expected = r'''
-\[INFO\] swh.core.config -- Loading config file .*
Found 0 swh-test-ping tasks
'''.lstrip()
@@ -200,7 +197,6 @@
])
expected = r'''
-\[INFO\] swh.core.config -- Loading config file .*
Found 0 swh-test-ping tasks
'''.lstrip()
@@ -219,7 +215,6 @@
])
expected = r'''
-\[INFO\] swh.core.config -- Loading config file .*
Found 1 swh-test-ping tasks
Task 2
@@ -250,7 +245,6 @@
])
expected = r'''
-\[INFO\] swh.core.config -- Loading config file .*
Found 2 swh-test-ping tasks
Task 1
@@ -298,7 +292,6 @@
])
expected = r'''
-\[INFO\] swh.core.config -- Loading config file .*
Found 1 swh-test-ping tasks
Task 2
@@ -328,7 +321,6 @@
])
expected = r'''
-\[INFO\] swh.core.config -- Loading config file .*
Found 2 tasks
Task 1
@@ -369,7 +361,6 @@
])
expected = r'''
-\[INFO\] swh.core.config -- Loading config file .*
Found 1 tasks
Task 2
@@ -399,7 +390,6 @@
])
expected = r'''
-\[INFO\] swh.core.config -- Loading config file .*
Found 2 tasks
Task 2
@@ -440,7 +430,6 @@
])
expected = r'''
-\[INFO\] swh.core.config -- Loading config file .*
Found 2 tasks
Task 1
@@ -481,7 +470,6 @@
])
expected = r'''
-\[INFO\] swh.core.config -- Loading config file .*
Found 2 tasks
Task 1
@@ -525,7 +513,6 @@
])
expected = r'''
-\[INFO\] swh.core.config -- Loading config file .*
Found 1 tasks
Task 2
@@ -558,7 +545,6 @@
])
expected = r'''
-\[INFO\] swh.core.config -- Loading config file .*
Found 1 tasks
Task 1
@@ -575,3 +561,110 @@
'''.lstrip()
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
+
+
+def _fill_storage_with_origins(storage, nb_origins):
+ storage.origin_add([
+ {
+ 'type': 'type{}'.format(i),
+ 'url': 'http://example.com/{}'.format(i),
+ }
+ for i in range(nb_origins)
+ ])
+
+
+@pytest.fixture
+def storage():
+ """An instance of swh.storage.in_memory.Storage that gets injected
+ into the CLI functions."""
+ storage = Storage()
+ with patch('swh.scheduler.cli.get_storage') as get_storage_mock:
+ get_storage_mock.return_value = storage
+ yield storage
+
+
+@patch('swh.scheduler.cli_utils.TASK_BATCH_SIZE', 3)
+def test_task_schedule_origins_dry_run(
+ swh_scheduler, storage):
+ """Tests the scheduling when origin_batch_size*task_batch_size is a
+ divisor of nb_origins."""
+ _fill_storage_with_origins(storage, 90)
+
+ result = invoke(swh_scheduler, False, [
+ 'task', 'schedule_origins', '--dry-run', 'swh-test-ping',
+ ])
+
+ # Check the output
+ expected = r'''
+Scheduled 3 tasks \(30 origins\).
+Scheduled 6 tasks \(60 origins\).
+Scheduled 9 tasks \(90 origins\).
+Done.
+'''.lstrip()
+ assert result.exit_code == 0, result.output
+ assert re.fullmatch(expected, result.output, re.MULTILINE), \
+ repr(result.output)
+
+ # Check scheduled tasks
+ tasks = swh_scheduler.search_tasks()
+ assert len(tasks) == 0
+
+
+@patch('swh.scheduler.cli_utils.TASK_BATCH_SIZE', 3)
+def test_task_schedule_origins(swh_scheduler, storage):
+ """Tests the scheduling when neither origin_batch_size or
+ task_batch_size is a divisor of nb_origins."""
+ _fill_storage_with_origins(storage, 70)
+
+ result = invoke(swh_scheduler, False, [
+ 'task', 'schedule_origins', 'swh-test-ping',
+ '--batch-size', '20',
+ ])
+
+ # Check the output
+ expected = r'''
+Scheduled 3 tasks \(60 origins\).
+Scheduled 4 tasks \(70 origins\).
+Done.
+'''.lstrip()
+ assert result.exit_code == 0, result.output
+ assert re.fullmatch(expected, result.output, re.MULTILINE), \
+ repr(result.output)
+
+ # Check scheduled tasks
+ tasks = swh_scheduler.search_tasks()
+ assert len(tasks) == 4
+ assert tasks[0]['arguments']['args'] == [list(range(1, 21))]
+ assert tasks[1]['arguments']['args'] == [list(range(21, 41))]
+ assert tasks[2]['arguments']['args'] == [list(range(41, 61))]
+ assert tasks[3]['arguments']['args'] == [list(range(61, 71))]
+ assert all(task['arguments']['kwargs'] == {} for task in tasks)
+
+
+def test_task_schedule_origins_kwargs(swh_scheduler, storage):
+ """Tests support of extra keyword-arguments."""
+ _fill_storage_with_origins(storage, 30)
+
+ result = invoke(swh_scheduler, False, [
+ 'task', 'schedule_origins', 'swh-test-ping',
+ '--batch-size', '20',
+ 'key1="value1"', 'key2="value2"',
+ ])
+
+ # Check the output
+ expected = r'''
+Scheduled 2 tasks \(30 origins\).
+Done.
+'''.lstrip()
+ assert result.exit_code == 0, result.output
+ assert re.fullmatch(expected, result.output, re.MULTILINE), \
+ repr(result.output)
+
+ # Check scheduled tasks
+ tasks = swh_scheduler.search_tasks()
+ assert len(tasks) == 2
+ assert tasks[0]['arguments']['args'] == [list(range(1, 21))]
+ assert tasks[1]['arguments']['args'] == [list(range(21, 31))]
+ assert all(task['arguments']['kwargs'] ==
+ {'key1': 'value1', 'key2': 'value2'}
+ for task in tasks)
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Nov 5 2024, 4:29 PM (11 w, 18 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3223562
Attached To
D1196: Add an 'swh-scheduler task schedule_origins' command to run a task on all origins.
Event Timeline
Log In to Comment