Changeset View
Changeset View
Standalone View
Standalone View
swh/lister/core/lister_base.py
Show All 10 Lines | |||||
import re | import re | ||||
import time | import time | ||||
from sqlalchemy import create_engine, func | from sqlalchemy import create_engine, func | ||||
from sqlalchemy.orm import sessionmaker | from sqlalchemy.orm import sessionmaker | ||||
from typing import Any, Dict, Type, Union | from typing import Any, Dict, Type, Union | ||||
from swh.core import config | from swh.core import config | ||||
from swh.core.utils import grouper | |||||
from swh.scheduler import get_scheduler, utils | from swh.scheduler import get_scheduler, utils | ||||
from .abstractattribute import AbstractAttribute | from .abstractattribute import AbstractAttribute | ||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||
▲ Show 20 Lines • Show All 436 Lines • ▼ Show 20 Lines | def schedule_missing_tasks(self, models_list, injected_repos): | ||||
# scheduling | # scheduling | ||||
if 'policy' in self.config: | if 'policy' in self.config: | ||||
m['policy'] = self.config['policy'] | m['policy'] = self.config['policy'] | ||||
if 'priority' in self.config: | if 'priority' in self.config: | ||||
m['priority'] = self.config['priority'] | m['priority'] = self.config['priority'] | ||||
task_dict = self.task_dict(**m) | task_dict = self.task_dict(**m) | ||||
tasks[_task_key(task_dict)] = (ir, m, task_dict) | tasks[_task_key(task_dict)] = (ir, m, task_dict) | ||||
new_tasks = self.scheduler.create_tasks( | gen_tasks = (task_dicts for (_, _, task_dicts) in tasks.values()) | ||||
(task_dicts for (_, _, task_dicts) in tasks.values())) | for grouped_tasks in grouper(gen_tasks, n=1000): | ||||
new_tasks = self.scheduler.create_tasks(list(grouped_tasks)) | |||||
for task in new_tasks: | for task in new_tasks: | ||||
ir, m, _ = tasks[_task_key(task)] | ir, m, _ = tasks[_task_key(task)] | ||||
ir.task_id = task['id'] | ir.task_id = task['id'] | ||||
def ingest_data(self, identifier, checks=False): | def ingest_data(self, identifier, checks=False): | ||||
"""The core data fetch sequence. Request server endpoint. Simplify and | """The core data fetch sequence. Request server endpoint. Simplify and | ||||
filter response list of repositories. Inject repo information into | filter response list of repositories. Inject repo information into | ||||
local db. Queue loader tasks for linked repositories. | local db. Queue loader tasks for linked repositories. | ||||
Args: | Args: | ||||
identifier: Resource identifier. | identifier: Resource identifier. | ||||
Show All 39 Lines |