Changeset View
Changeset View
Standalone View
Standalone View
swh/lister/debian/tests/test_lister.py
# Copyright (C) 2019-2021 The Software Heritage developers | # Copyright (C) 2019-2021 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from collections import defaultdict | from collections import defaultdict | ||||
from email.utils import formatdate | |||||
import os | import os | ||||
from pathlib import Path | from pathlib import Path | ||||
from typing import Dict, List, Set, Tuple | from typing import Dict, List, Set, Tuple | ||||
from debian.deb822 import Sources | from debian.deb822 import Sources | ||||
import pytest | import pytest | ||||
from swh.lister.debian.lister import ( | from swh.lister.debian.lister import ( | ||||
▲ Show 20 Lines • Show All 58 Lines • ▼ Show 20 Lines | for suite, sources in debian_sources.items(): | ||||
suite_pkg_info[suite] = defaultdict(list) | suite_pkg_info[suite] = defaultdict(list) | ||||
for pkg_src in Sources.iter_paragraphs(sources): | for pkg_src in Sources.iter_paragraphs(sources): | ||||
suite_pkg_info[suite][pkg_src["Package"]].append(pkg_src) | suite_pkg_info[suite][pkg_src["Package"]].append(pkg_src) | ||||
for idx_url, compression in lister.debian_index_urls(suite, _components[0]): | for idx_url, compression in lister.debian_index_urls(suite, _components[0]): | ||||
if compression: | if compression: | ||||
requests_mock.get(idx_url, status_code=404) | requests_mock.get(idx_url, status_code=404) | ||||
else: | else: | ||||
requests_mock.get(idx_url, text=sources) | requests_mock.get( | ||||
idx_url, | |||||
text=sources, | |||||
headers={"Last-Modified": formatdate(usegmt=True)}, | |||||
) | |||||
for idx_url, _ in lister.debian_index_urls(suite, _components[1]): | for idx_url, _ in lister.debian_index_urls(suite, _components[1]): | ||||
requests_mock.get(idx_url, status_code=404) | requests_mock.get(idx_url, status_code=404) | ||||
return lister, suite_pkg_info | return lister, suite_pkg_info | ||||
def _check_listed_origins( | def _check_listed_origins( | ||||
Show All 27 Lines | for suite, pkg_info in suite_pkg_info.items(): | ||||
# get ListerOrigin object from scheduler database | # get ListerOrigin object from scheduler database | ||||
filtered_origins = [ | filtered_origins = [ | ||||
scheduler_origin | scheduler_origin | ||||
for scheduler_origin in scheduler_origins | for scheduler_origin in scheduler_origins | ||||
if scheduler_origin.url == origin_url | if scheduler_origin.url == origin_url | ||||
] | ] | ||||
assert filtered_origins | assert filtered_origins | ||||
assert filtered_origins[0].last_update is not None | |||||
packages = filtered_origins[0].extra_loader_arguments["packages"] | packages = filtered_origins[0].extra_loader_arguments["packages"] | ||||
# check the version info are available | # check the version info are available | ||||
assert package_version_key in packages | assert package_version_key in packages | ||||
# check package files URIs are available | # check package files URIs are available | ||||
for file in pkg_src["files"]: | for file in pkg_src["files"]: | ||||
filename = file["name"] | filename = file["name"] | ||||
file_uri = os.path.join( | file_uri = os.path.join( | ||||
▲ Show 20 Lines • Show All 113 Lines • Show Last 20 Lines |