Page MenuHomeSoftware Heritage

No OneTemporary

diff --git a/ardumont/schedule/Pipfile b/ardumont/schedule/Pipfile
new file mode 100644
index 0000000..97a53c7
--- /dev/null
+++ b/ardumont/schedule/Pipfile
@@ -0,0 +1,15 @@
+[[source]]
+name = "pypi"
+url = "https://pypi.org/simple"
+verify_ssl = true
+
+[dev-packages]
+pdbpp = "*"
+ipython = "*"
+
+[packages]
+click = "*"
+celery = "*"
+kombu = "*"
+"swh.scheduler" = "==1.2.1"
+ipython = "*"
diff --git a/ardumont/schedule/README.md b/ardumont/schedule/README.md
new file mode 100644
index 0000000..f366bbe
--- /dev/null
+++ b/ardumont/schedule/README.md
@@ -0,0 +1,4 @@
+schedule tools
+==============
+
+Tools to schedule origins
diff --git a/ardumont/schedule_with_queue_length_check.py b/ardumont/schedule/schedule_with_queue_length_check.py
similarity index 78%
rename from ardumont/schedule_with_queue_length_check.py
rename to ardumont/schedule/schedule_with_queue_length_check.py
index 4117499..689ae63 100755
--- a/ardumont/schedule_with_queue_length_check.py
+++ b/ardumont/schedule/schedule_with_queue_length_check.py
@@ -1,327 +1,388 @@
#!/usr/bin/env python3
-# Copyright (C) 2017-2021 The Software Heritage developers
+# Copyright (C) 2017-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import click
import sys
import time
-from kombu.utils.uuid import uuid
-
-from typing import Optional
-from swh.core.config import load_from_envvar
-from swh.model.hashutil import hash_to_hex
-from celery.exceptions import NotRegistered
-try:
- from swh.indexer.producer import gen_sha1
-except ImportError:
- pass
+from typing import Dict, Optional
from swh.scheduler.celery_backend.config import app as main_app
MAX_WAITING_TIME = 10
def stdin_to_mercurial_tasks(batch_size):
"""Generates from stdin the proper task argument for the
loader-mercurial worker.
Args:
batch_size (int): Not used
Yields:
expected dictionary of 'arguments' key
"""
for line in sys.stdin:
line = line.rstrip()
origin_url, archive_path = line.split(' ')
yield {
'arguments': {
'args': [],
'kwargs': {
'origin_url': origin_url,
'archive_path': archive_path,
'visit_date': 'Tue, 3 May 2016 17:16:32 +0200',
},
},
}
def stdin_to_bitbucket_mercurial_tasks(batch_size):
"""Generates from stdin the proper task argument for the
bitbucket loader-mercurial worker.
Args:
batch_size (int): Not used
Yields:
expected dictionary of 'arguments' key
"""
for line in sys.stdin:
line = line.rstrip()
origin_url, directory, visit_date_day, visit_date_hour = line.split(' ')
visit_date = ' '.join([visit_date_day, visit_date_hour])
yield {
'arguments': {
'args': [],
'kwargs': {
'url': origin_url,
'directory': directory,
'visit_date': visit_date,
},
},
}
def stdin_to_svn_tasks(batch_size, type='svn'):
"""Generates from stdin the proper task argument for the loader-svn
worker.
Args:
batch_size (int): Not used
Yields:
expected dictionary of 'arguments' key
"""
for line in sys.stdin:
line = line.rstrip()
origin_url, path = line.split(' ')
kwargs = {
'visit_date': 'Tue, 3 May 2016 17:16:32 +0200',
'start_from_scratch': True,
}
if type == 'svn':
kwargs.update({
'svn_url': origin_url,
})
else:
kwargs.update({
'archive_path': path,
'origin_url': origin_url,
})
yield {
'arguments': {
'args': [],
'kwargs': kwargs,
},
}
-def stdin_to_index_tasks(batch_size=1000):
- """Generates from stdin the proper task argument for the orchestrator.
+def stdin_to_git_large_tasks(batch_size, type='git'):
+ """Generates from stdin the proper task argument for the loader-git worker.
Args:
- batch_size (int): Number of sha1s to group together
+ batch_size (int): Not used
Yields:
expected dictionary of 'arguments' key
"""
- for sha1s in gen_sha1(batch=batch_size):
+ for line in sys.stdin:
+ origin_url = line.rstrip()
+ kwargs = {
+ 'url': origin_url,
+ 'lister_name': 'github',
+ 'lister_instance_name': 'github',
+ 'pack_size_bytes': 34359738368,
+ }
yield {
'arguments': {
- 'args': [sha1s],
- 'kwargs': {}
+ 'args': [],
+ 'kwargs': kwargs,
},
}
-def print_last_hash(d):
+def stdin_to_git_normal_tasks(batch_size, type='git'):
+ """Generates from stdin the proper task argument for the loader-git worker.
+
+ Args:
+ batch_size (int): Not used
+
+ Yields:
+ expected dictionary of 'arguments' key
+
+ """
+ for line in sys.stdin:
+ origin_url = line.rstrip()
+ kwargs = {
+ 'url': origin_url,
+ 'lister_name': 'github',
+ 'lister_instance_name': 'github',
+ }
+ yield {
+ 'arguments': {
+ 'args': [],
+ 'kwargs': kwargs,
+ },
+ }
+
+
+def stdin_to_index_tasks(batch_size=1000):
+ """Generates from stdin the proper task argument for the orchestrator.
+
+ Args:
+ batch_size (int): Number of sha1s to group together
+
+ Yields:
+ expected dictionary of 'arguments' key
+
+ """
+ try:
+ from swh.indexer.producer import gen_sha1
+ for sha1s in gen_sha1(batch=batch_size):
+ yield {
+ 'arguments': {
+ 'args': [sha1s],
+ 'kwargs': {}
+ },
+ }
+ except ImportError:
+ pass
+
+
+def print_last_hash(arguments: Dict) -> None:
"""Given a dict of arguments, take the sha1s list, print the last
element as hex hash.
"""
- l = d['args']
+ from swh.model.hashutil import hash_to_hex
+
+ l = arguments['args']
if l:
print(hash_to_hex(l[0][-1]))
QUEUES = {
'svndump': { # for svn, we use the same queue for length checking
# and scheduling
'task_name': 'swh.loader.svn.tasks.MountAndLoadSvnRepository',
'threshold': 1000,
# to_task the function to use to transform the input in task
'task_generator_fn': (lambda b: stdin_to_svn_tasks(b, type='dump')),
'print_fn': print,
},
'svn': { # for svn, we use the same queue for length checking
- # and scheduling
+ # and scheduling
'task_name': 'swh.loader.svn.tasks.LoadSvnRepository',
'threshold': 1000,
# to_task the function to use to transform the input in task
'task_generator_fn': stdin_to_svn_tasks,
'print_fn': print,
},
'mercurial': { # for mercurial, we use the same queue for length
# checking and scheduling
'task_name': 'swh.loader.mercurial.tasks.LoadArchiveMercurial',
'threshold': 1000,
# to_task the function to use to transform the input in task
'task_generator_fn': stdin_to_mercurial_tasks,
'print_fn': print,
},
'bitbucket-mercurial':{
'task_name': 'oneshot:swh.loader.mercurial.tasks.LoadMercurial',
'threshold': None,
# to_task the function to use to transform the input in task
'task_generator_fn': stdin_to_bitbucket_mercurial_tasks,
'print_fn': print,
},
'indexer': { # for indexer, we schedule using the orchestrator's queue
# we check the length on the mimetype queue though
'task_name': 'swh.indexer.tasks.OrchestratorAllContents',
'threshold': 1000,
'task_generator_fn': stdin_to_index_tasks,
'print_fn': print_last_hash,
- }
+ },
+ 'oneshot-large-git': {
+ 'task_name': 'oneshot:swh.loader.git.tasks.UpdateGitRepository',
+ 'threshold': 1000,
+ # to_task the function to use to transform the input in task
+ 'task_generator_fn': stdin_to_git_large_tasks,
+ 'print_fn': print,
+ },
+ 'oneshot-normal-git': {
+ 'task_name': 'oneshot2:swh.loader.git.tasks.UpdateGitRepository',
+ 'threshold': 1000,
+ # to_task the function to use to transform the input in task
+ 'task_generator_fn': stdin_to_git_normal_tasks,
+ 'print_fn': print,
+ },
}
def queue_length_get(app, queue_name: str) -> Optional[int]:
"""Read the queue's current length.
Args:
app: Application
queue_name: fqdn queue name to retrieve queue length from
Returns:
queue_name's length if any
"""
try:
queue_length = app.get_queue_length(app.tasks[queue_name].task_queue)
- except:
+ except Exception:
queue_length = None
return queue_length
def send_new_tasks(app, queues_to_check):
- """Can we send new tasks for scheduling? To answer this, we check the
- queues_to_check's current number of scheduled tasks.
+ """Send new tasks for scheduling when possible. Check the queues_to_check's current
+ number of scheduled tasks.
If any of queues_to_check sees its threshold reached, we cannot
send new tasks so this return False. Otherwise, we can send new
tasks so this returns True.
Args:
app: Application
queues_to_check ([dict]): List of dict with keys 'task_name',
'threshold'.
Returns:
True if we can send new tasks, False otherwise
"""
for queue_to_check in queues_to_check:
queue_name = queue_to_check['task_name']
threshold = queue_to_check['threshold']
_queue_length = queue_length_get(app, queue_name)
if _queue_length is not None and _queue_length >= threshold:
return False
return True
@click.command(help='Read from stdin and send message to queue ')
@click.option('--queue-name', help='Queue concerned',
type=click.Choice(QUEUES))
@click.option('--threshold', help='Threshold for the queue',
type=click.INT,
default=1000)
@click.option('--batch-size', help='Batch size if batching is possible',
type=click.INT,
default=1000)
@click.option('--waiting-time', help='Waiting time between checks',
type=click.INT,
default=MAX_WAITING_TIME)
def main(queue_name, threshold, batch_size, waiting_time, app=main_app):
if queue_name not in QUEUES:
raise ValueError("Unsupported %s, possible values: %s" % (
queue_name, QUEUES))
for module in app.conf.CELERY_IMPORTS:
__import__(module)
queue_information = QUEUES[queue_name]
task_name = queue_information['task_name']
if ":" in task_name:
task_name_without_prefix = task_name.split(":")[1]
else:
task_name_without_prefix = task_name
- scheduling_task = app.tasks[task_name_without_prefix]
-
if not threshold:
threshold = queue_information['threshold']
# Retrieve the queues to check for current threshold limit reached
# or not. If none is provided (default case), we use the
# scheduling queue as checking queue
queues_to_check = queue_information.get('queues_to_check', [{
'task_name': task_name_without_prefix,
'threshold': threshold,
}])
while True:
throttled = False
remains_data = False
pending_tasks = []
if send_new_tasks(app, queues_to_check):
# we can send new tasks, compute how many we can send
queue_length = queue_length_get(app, task_name_without_prefix)
if queue_length is not None:
nb_tasks_to_send = threshold - queue_length
else:
nb_tasks_to_send = threshold
else:
nb_tasks_to_send = 0
if nb_tasks_to_send > 0:
count = 0
task_fn = queue_information['task_generator_fn']
for _task in task_fn(batch_size):
pending_tasks.append(_task)
count += 1
if count >= nb_tasks_to_send:
throttled = True
remains_data = True
break
if not pending_tasks:
# check for some more data on stdin
if not remains_data:
# if no more data, we break to exit
break
print_fn = queue_information.get('print_fn', print)
+ from kombu.utils.uuid import uuid
for _task in pending_tasks:
app.send_task(
task_name_without_prefix,
task_id=uuid(),
args=_task["arguments"]["args"],
kwargs=_task["arguments"]["kwargs"],
queue=task_name,
)
print_fn(_task['arguments'])
else:
throttled = True
if throttled:
time.sleep(waiting_time)
if __name__ == '__main__':
main()

File Metadata

Mime Type
text/x-diff
Expires
Fri, Jul 4, 1:16 PM (1 w, 7 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3293471

Event Timeline