Changeset View
Changeset View
Standalone View
Standalone View
swh/lister/debian/lister.py
# Copyright (C) 2017-2021 The Software Heritage developers | # Copyright (C) 2017-2022 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import bz2 | import bz2 | ||||
from collections import defaultdict | from collections import defaultdict | ||||
from dataclasses import dataclass, field | from dataclasses import dataclass, field | ||||
from email.utils import parsedate_to_datetime | from email.utils import parsedate_to_datetime | ||||
import gzip | import gzip | ||||
from itertools import product | from itertools import product | ||||
import logging | import logging | ||||
import lzma | import lzma | ||||
import os | import os | ||||
from typing import Any, Callable, Dict, Iterator, List, Optional, Set, Tuple | from typing import Any, Callable, Dict, Iterator, List, Optional, Set, Tuple | ||||
from urllib.parse import urljoin | from urllib.parse import urljoin | ||||
from debian.deb822 import Sources | from debian.deb822 import Sources | ||||
import requests | from requests.exceptions import HTTPError | ||||
from swh.scheduler.interface import SchedulerInterface | from swh.scheduler.interface import SchedulerInterface | ||||
from swh.scheduler.model import ListedOrigin | from swh.scheduler.model import ListedOrigin | ||||
from .. import USER_AGENT | |||||
from ..pattern import CredentialsType, Lister | from ..pattern import CredentialsType, Lister | ||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||
decompressors: Dict[str, Callable[[Any], Any]] = { | decompressors: Dict[str, Callable[[Any], Any]] = { | ||||
"gz": lambda f: gzip.GzipFile(fileobj=f), | "gz": lambda f: gzip.GzipFile(fileobj=f), | ||||
"bz2": bz2.BZ2File, | "bz2": bz2.BZ2File, | ||||
"xz": lzma.LZMAFile, | "xz": lzma.LZMAFile, | ||||
▲ Show 20 Lines • Show All 56 Lines • ▼ Show 20 Lines | ): | ||||
# to ensure urljoin will produce valid Sources URL | # to ensure urljoin will produce valid Sources URL | ||||
if not self.url.endswith("/"): | if not self.url.endswith("/"): | ||||
self.url += "/" | self.url += "/" | ||||
self.distribution = distribution | self.distribution = distribution | ||||
self.suites = suites | self.suites = suites | ||||
self.components = components | self.components = components | ||||
self.session = requests.Session() | |||||
self.session.headers.update({"User-Agent": USER_AGENT}) | |||||
# will hold all listed origins info | # will hold all listed origins info | ||||
self.listed_origins: Dict[DebianOrigin, ListedOrigin] = {} | self.listed_origins: Dict[DebianOrigin, ListedOrigin] = {} | ||||
# will contain origin urls that have already been listed | # will contain origin urls that have already been listed | ||||
# in a previous page | # in a previous page | ||||
self.sent_origins: Set[DebianOrigin] = set() | self.sent_origins: Set[DebianOrigin] = set() | ||||
# will contain already listed package info that need to be sent | # will contain already listed package info that need to be sent | ||||
# to the scheduler for update in the commit_page method | # to the scheduler for update in the commit_page method | ||||
self.origins_to_update: Dict[DebianOrigin, ListedOrigin] = {} | self.origins_to_update: Dict[DebianOrigin, ListedOrigin] = {} | ||||
Show All 18 Lines | ) -> Iterator[Tuple[str, str]]: | ||||
] | ] | ||||
for base_url, ext in product(base_urls, compression_exts): | for base_url, ext in product(base_urls, compression_exts): | ||||
yield (f"{base_url}.{ext}", ext) | yield (f"{base_url}.{ext}", ext) | ||||
yield (base_url, "") | yield (base_url, "") | ||||
def page_request(self, suite: Suite, component: Component) -> DebianPageType: | def page_request(self, suite: Suite, component: Component) -> DebianPageType: | ||||
"""Return parsed package Sources file for a given debian suite and component.""" | """Return parsed package Sources file for a given debian suite and component.""" | ||||
for url, compression in self.debian_index_urls(suite, component): | for url, compression in self.debian_index_urls(suite, component): | ||||
response = requests.get(url, stream=True) | try: | ||||
logging.debug("Fetched URL: %s, status code: %s", url, response.status_code) | response = self.http_request(url, stream=True) | ||||
if response.status_code == 200: | except HTTPError: | ||||
pass | |||||
else: | |||||
last_modified = response.headers.get("Last-Modified") | last_modified = response.headers.get("Last-Modified") | ||||
self.last_sources_update = ( | self.last_sources_update = ( | ||||
parsedate_to_datetime(last_modified) if last_modified else None | parsedate_to_datetime(last_modified) if last_modified else None | ||||
) | ) | ||||
decompressor = decompressors.get(compression) | decompressor = decompressors.get(compression) | ||||
if decompressor: | if decompressor: | ||||
data = decompressor(response.raw).readlines() | data = decompressor(response.raw).readlines() | ||||
else: | else: | ||||
▲ Show 20 Lines • Show All 166 Lines • Show Last 20 Lines |