Changeset View
Changeset View
Standalone View
Standalone View
swh/lister/core/lister_base.py
# Copyright (C) 2015-2018 the Software Heritage developers | # Copyright (C) 2015-2019 the Software Heritage developers | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import abc | import abc | ||||
import datetime | import datetime | ||||
import gzip | import gzip | ||||
import json | import json | ||||
import logging | import logging | ||||
import os | import os | ||||
import re | import re | ||||
import time | import time | ||||
from sqlalchemy import create_engine, func | from sqlalchemy import create_engine, func | ||||
from sqlalchemy.orm import sessionmaker | from sqlalchemy.orm import sessionmaker | ||||
from typing import Any, Type, Union | from typing import Any, Sequence, Type, Union | ||||
from swh.core import config | from swh.core import config | ||||
from swh.scheduler import get_scheduler, utils | from swh.scheduler import get_scheduler, utils | ||||
from .abstractattribute import AbstractAttribute | from .abstractattribute import AbstractAttribute | ||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||
▲ Show 20 Lines • Show All 41 Lines • ▼ Show 20 Lines | class ListerBase(abc.ABC, config.SWHConfig): | ||||
""" | """ | ||||
MODEL = AbstractAttribute( | MODEL = AbstractAttribute( | ||||
'Subclass type (not instance) of swh.lister.core.models.ModelBase ' | 'Subclass type (not instance) of swh.lister.core.models.ModelBase ' | ||||
'customized for a specific service.' | 'customized for a specific service.' | ||||
) # type: Union[AbstractAttribute, Type[Any]] | ) # type: Union[AbstractAttribute, Type[Any]] | ||||
LISTER_NAME = AbstractAttribute( | LISTER_NAME = AbstractAttribute( | ||||
"Lister's name") # type: Union[AbstractAttribute, str] | "Lister's name") # type: Union[AbstractAttribute, str] | ||||
# Lister's output scheduler task types (default to empty) | |||||
SCHEDULED_TASK_TYPES: Sequence[str] = [] | |||||
def transport_request(self, identifier): | def transport_request(self, identifier): | ||||
"""Given a target endpoint identifier to query, try once to request it. | """Given a target endpoint identifier to query, try once to request it. | ||||
Implementation of this method determines the network request protocol. | Implementation of this method determines the network request protocol. | ||||
Args: | Args: | ||||
identifier (string): unique identifier for an endpoint query. | identifier (string): unique identifier for an endpoint query. | ||||
▲ Show 20 Lines • Show All 170 Lines • ▼ Show 20 Lines | def __init__(self, override_config=None): | ||||
self.config.update(override_config) | self.config.update(override_config) | ||||
logger.debug('%s CONFIG=%s' % (self, self.config)) | logger.debug('%s CONFIG=%s' % (self, self.config)) | ||||
self.scheduler = get_scheduler(**self.config['scheduler']) | self.scheduler = get_scheduler(**self.config['scheduler']) | ||||
self.db_engine = create_engine(self.config['lister']['args']['db']) | self.db_engine = create_engine(self.config['lister']['args']['db']) | ||||
self.mk_session = sessionmaker(bind=self.db_engine) | self.mk_session = sessionmaker(bind=self.db_engine) | ||||
self.db_session = self.mk_session() | self.db_session = self.mk_session() | ||||
def checks(self): | |||||
"""Basic checks on lister configuration. | |||||
This checks the expected output task types exist in the scheduler. It | |||||
it does not, this raises a ValueError. | |||||
""" | |||||
missing_types = [] | |||||
for task_type in self.SCHEDULED_TASK_TYPES: | |||||
ok = self.scheduler.get_task_type(task_type) | |||||
if not ok: | |||||
missing_types.append(task_type) | |||||
if missing_types: | |||||
raise ValueError( | |||||
"Misconfigured lister: Missing %s type in scheduler" % ( | |||||
','.join(missing_types))) | |||||
def reset_backoff(self): | def reset_backoff(self): | ||||
"""Reset exponential backoff timeout to initial level.""" | """Reset exponential backoff timeout to initial level.""" | ||||
self.backoff = self.INITIAL_BACKOFF | self.backoff = self.INITIAL_BACKOFF | ||||
def back_off(self): | def back_off(self): | ||||
"""Get next exponential backoff timeout.""" | """Get next exponential backoff timeout.""" | ||||
ret = self.backoff | ret = self.backoff | ||||
self.backoff *= 10 | self.backoff *= 10 | ||||
▲ Show 20 Lines • Show All 254 Lines • Show Last 20 Lines |