Changeset View
Changeset View
Standalone View
Standalone View
swh/lister/pattern.py
# Copyright (C) 2020-2022 The Software Heritage developers | # Copyright (C) 2020-2022 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from __future__ import annotations | from __future__ import annotations | ||||
from dataclasses import dataclass | from dataclasses import dataclass | ||||
import logging | import logging | ||||
from typing import Any, Dict, Generic, Iterable, Iterator, List, Optional, TypeVar | from typing import Any, Dict, Generic, Iterable, Iterator, List, Optional, Set, TypeVar | ||||
from urllib.parse import urlparse | from urllib.parse import urlparse | ||||
import requests | import requests | ||||
from tenacity.before_sleep import before_sleep_log | from tenacity.before_sleep import before_sleep_log | ||||
from swh.core.config import load_from_envvar | from swh.core.config import load_from_envvar | ||||
from swh.core.utils import grouper | from swh.core.utils import grouper | ||||
from swh.scheduler import get_scheduler, model | from swh.scheduler import get_scheduler, model | ||||
▲ Show 20 Lines • Show All 104 Lines • ▼ Show 20 Lines | ): | ||||
self.updated = False | self.updated = False | ||||
self.session = requests.Session() | self.session = requests.Session() | ||||
# Declare the USER_AGENT is more sysadm-friendly for the forge we list | # Declare the USER_AGENT is more sysadm-friendly for the forge we list | ||||
self.session.headers.update( | self.session.headers.update( | ||||
{"User-Agent": USER_AGENT_TEMPLATE % self.LISTER_NAME} | {"User-Agent": USER_AGENT_TEMPLATE % self.LISTER_NAME} | ||||
) | ) | ||||
self.recorded_origins: Set[str] = set() | |||||
@http_retry(before_sleep=before_sleep_log(logger, logging.WARNING)) | @http_retry(before_sleep=before_sleep_log(logger, logging.WARNING)) | ||||
def http_request(self, url: str, method="GET", **kwargs) -> requests.Response: | def http_request(self, url: str, method="GET", **kwargs) -> requests.Response: | ||||
logger.debug("Fetching URL %s with params %s", url, kwargs.get("params")) | logger.debug("Fetching URL %s with params %s", url, kwargs.get("params")) | ||||
response = self.session.request(method, url, **kwargs) | response = self.session.request(method, url, **kwargs) | ||||
if response.status_code not in (200, 304): | if response.status_code not in (200, 304): | ||||
logger.warning( | logger.warning( | ||||
Show All 10 Lines | def run(self) -> ListerStats: | ||||
"""Run the lister. | """Run the lister. | ||||
Returns: | Returns: | ||||
A counter with the number of pages and origins seen for this run | A counter with the number of pages and origins seen for this run | ||||
of the lister. | of the lister. | ||||
""" | """ | ||||
full_stats = ListerStats() | full_stats = ListerStats() | ||||
self.recorded_origins = set() | |||||
try: | try: | ||||
for page in self.get_pages(): | for page in self.get_pages(): | ||||
full_stats.pages += 1 | full_stats.pages += 1 | ||||
origins = self.get_origins_from_page(page) | origins = self.get_origins_from_page(page) | ||||
full_stats.origins += self.send_origins(origins) | sent_origins = self.send_origins(origins) | ||||
self.recorded_origins.update(sent_origins) | |||||
full_stats.origins = len(self.recorded_origins) | |||||
self.commit_page(page) | self.commit_page(page) | ||||
finally: | finally: | ||||
self.finalize() | self.finalize() | ||||
if self.updated: | if self.updated: | ||||
self.set_state_in_scheduler() | self.set_state_in_scheduler() | ||||
return full_stats | return full_stats | ||||
▲ Show 20 Lines • Show All 79 Lines • ▼ Show 20 Lines | def commit_page(self, page: PageType) -> None: | ||||
This method can be used to update the state after a page of origins has been | This method can be used to update the state after a page of origins has been | ||||
successfully recorded in the scheduler backend. If the new state should be | successfully recorded in the scheduler backend. If the new state should be | ||||
recorded at the point the lister completes, the :attr:`updated` attribute must | recorded at the point the lister completes, the :attr:`updated` attribute must | ||||
be set. | be set. | ||||
""" | """ | ||||
pass | pass | ||||
def send_origins(self, origins: Iterable[model.ListedOrigin]) -> int: | def send_origins(self, origins: Iterable[model.ListedOrigin]) -> List[str]: | ||||
"""Record a list of :class:`model.ListedOrigin` in the scheduler. | """Record a list of :class:`model.ListedOrigin` in the scheduler. | ||||
Returns: | Returns: | ||||
the number of listed origins recorded in the scheduler | the list of origin URLs recorded in scheduler database | ||||
""" | """ | ||||
count = 0 | recorded_origins = [] | ||||
for batch_origins in grouper(origins, n=1000): | for batch_origins in grouper(origins, n=1000): | ||||
ret = self.scheduler.record_listed_origins(batch_origins) | ret = self.scheduler.record_listed_origins(batch_origins) | ||||
count += len(ret) | recorded_origins += [origin.url for origin in ret] | ||||
return count | return recorded_origins | ||||
@classmethod | @classmethod | ||||
def from_config(cls, scheduler: Dict[str, Any], **config: Any): | def from_config(cls, scheduler: Dict[str, Any], **config: Any): | ||||
"""Instantiate a lister from a configuration dict. | """Instantiate a lister from a configuration dict. | ||||
This is basically a backwards-compatibility shim for the CLI. | This is basically a backwards-compatibility shim for the CLI. | ||||
Args: | Args: | ||||
Show All 39 Lines |