Changeset View
Changeset View
Standalone View
Standalone View
swh/lister/arch/lister.py
- This file was added.
# Copyright (C) 2022 The Software Heritage developers | |||||
# See the AUTHORS file at the top-level directory of this distribution | |||||
# License: GNU General Public License version 3, or any later version | |||||
# See top-level LICENSE file for more information | |||||
import datetime | |||||
import logging | |||||
from pathlib import Path | |||||
import re | |||||
import tarfile | |||||
from typing import Any, Dict, Iterator, List, Optional | |||||
from urllib.parse import unquote, urljoin | |||||
from bs4 import BeautifulSoup | |||||
import iso8601 | |||||
import requests | |||||
from swh.model.hashutil import hash_to_hex | |||||
from swh.scheduler.interface import SchedulerInterface | |||||
from swh.scheduler.model import ListedOrigin | |||||
from ..pattern import CredentialsType, StatelessLister | |||||
logger = logging.getLogger(__name__) | |||||
# Aliasing the page results returned by `get_pages` method from the lister. | |||||
ArchListerPage = List[Dict[str, Any]] | |||||
def size_to_bytes(size: str) -> int: | |||||
"""Convert human readable file size to bytes. | |||||
Resulting value is an approximation as input value is in most case rounded. | |||||
Args: | |||||
size: A string representing a human readable file size (eg: '500K') | |||||
Returns: | |||||
A decimal representation of file size | |||||
Examples:: | |||||
>>> size_to_bytes("500") | |||||
500 | |||||
>>> size_to_bytes("1K") | |||||
1000 | |||||
""" | |||||
units = { | |||||
"K": 1000, | |||||
"M": 1000**2, | |||||
"G": 1000**3, | |||||
"T": 1000**4, | |||||
"P": 1000**5, | |||||
"E": 1000**6, | |||||
"Z": 1000**7, | |||||
"Y": 1000**8, | |||||
} | |||||
if size.endswith(tuple(units.keys())): | |||||
v, u = (size[:-1], size[-1]) | |||||
return int(v) * units[u] | |||||
else: | |||||
return int(size) | |||||
class ArchLister(StatelessLister[ArchListerPage]): | |||||
"""List Arch linux origins from 'core', 'extra', and 'community' repositories | |||||
It downloads core.tar.gz, extra.tar.gz and community.tar.gz from | |||||
https://archive.archlinux.org/repos/last/ extract to a temp directory and | |||||
then walks through each 'desc' files. | |||||
Each 'desc' file describe the latest released version of a package and helps | |||||
to build an origin url from where scrapping artifacts metadata. | |||||
""" | |||||
LISTER_NAME = "arch" | |||||
VISIT_TYPE = "arch" | |||||
INSTANCE = "arch" | |||||
DESTINATION_PATH = Path("/tmp/archlinux_archive") | |||||
ARCH_PACKAGE_URL_PATTERN = ( | |||||
"https://archive.archlinux.org/packages/{name[0]}/{name}/" | |||||
) | |||||
ARCH_PACKAGE_DOWNLOAD_URL_PATTERN = ( | |||||
"https://archive.archlinux.org/packages/{name[0]}/{name}/{filename}" | |||||
) | |||||
ARCH_API_URL_PATTERN = "https://archlinux.org/packages/{repo}/{arch}/{pkgname}/json" | |||||
def __init__( | |||||
self, | |||||
scheduler: SchedulerInterface, | |||||
credentials: Optional[CredentialsType] = None, | |||||
repos: List[str] = ["core", "extra", "community"], | |||||
mirror_url: str = "https://archive.archlinux.org/", | |||||
): | |||||
super().__init__( | |||||
scheduler=scheduler, | |||||
credentials=credentials, | |||||
url=mirror_url, | |||||
instance=self.INSTANCE, | |||||
) | |||||
self.repos = repos | |||||
def scrap_package_versions(self, name: str, repo: str) -> List[Dict[str, Any]]: | |||||
"""Given a package 'name' and 'repo', make an http call to origin url and parse its content | |||||
to get package versions artifacts data. | |||||
Args: | |||||
name: Package name | |||||
repo: The repository the package belongs to (one of self.repos) | |||||
Returns: | |||||
A list of dict of version | |||||
Example:: | |||||
[ | |||||
{"url": "https://archive.archlinux.org/packages/d/dialog/dialog-1:1.3_20190211-1-x86_64.pkg.tar.xz", # noqa: B950 | |||||
"arch": "x86_64", | |||||
"repo": "core", | |||||
"length": 180000, | |||||
"filename": "dialog-1:1.3_20190211-1-x86_64.pkg.tar.xz", | |||||
"last_modified": "2019-02-13T08:36:00"}, | |||||
] | |||||
""" | |||||
assert repo in self.repos | |||||
url = self.ARCH_PACKAGE_URL_PATTERN.format(name=name) | |||||
soup = BeautifulSoup(requests.get(url).text, "html.parser") | |||||
links = soup.find_all("a", href=True) | |||||
# drop the first link (used to go to up directory) | |||||
links.pop(0) | |||||
versions = [] | |||||
for link in links: | |||||
# filename displayed can be cropped if name is too long, get it from href instead | |||||
filename = unquote(link.attrs["href"]) | |||||
if filename.endswith((".tar.xz", ".tar.zst")): | |||||
# Extract arch from filename | |||||
arch_rex = re.compile( | |||||
r"^(.*)-(?P<arch>any|i686|x86_64)(.pkg.tar.(?:zst|xz))$" | |||||
) | |||||
m = arch_rex.match(filename) | |||||
if not m: | |||||
logger.debug(f"Can not find a match for architecture in {filename}") | |||||
break | |||||
else: | |||||
arch = m.group("arch") | |||||
# Extract last_modified and an approximate file size | |||||
raw_text = link.next_sibling | |||||
raw_text_rex = re.compile( | |||||
r"^(?P<last_modified>\d+-\w+-\d+ \d\d:\d\d)\s+(?P<size>\w+)$" | |||||
) | |||||
s = raw_text_rex.search(raw_text.strip()) | |||||
if not s: | |||||
logger.debug( | |||||
f"Can not find a match for 'last_modified' and/or " | |||||
f"'size' in '{raw_text}'" | |||||
) | |||||
break | |||||
else: | |||||
assert s.groups() | |||||
assert len(s.groups()) == 2 | |||||
last_modified, size = s.groups() | |||||
# format as expected | |||||
last_modified = datetime.datetime.strptime( | |||||
last_modified, "%d-%b-%Y %H:%M" | |||||
).isoformat() | |||||
length = size_to_bytes(size) # we want bytes | |||||
# link url is relative, make a canonical one | |||||
url = self.ARCH_PACKAGE_DOWNLOAD_URL_PATTERN.format( | |||||
name=name, filename=filename | |||||
) | |||||
versions.append( | |||||
dict( | |||||
repo=repo, | |||||
arch=arch, | |||||
filename=filename, | |||||
url=url, | |||||
last_modified=last_modified, | |||||
length=length, | |||||
) | |||||
) | |||||
return versions | |||||
def get_repo_archive(self, repo: str) -> Path: | |||||
"""Given a repo (one of self.repos), retrieve a {repo}.tar.gz archive | |||||
which contains 'desc' file for each package related to that repo. | |||||
Args: | |||||
repo: name of the repo | |||||
Returns: | |||||
a directory Path where the archive has been extracted to. | |||||
""" | |||||
prefix = urljoin(self.url, "/repos/last/") | |||||
url = urljoin(prefix, f"{repo}/os/x86_64/{repo}.files.tar.gz") | |||||
filename = url.split("/")[-1] | |||||
archive_path = Path(self.DESTINATION_PATH, filename) | |||||
res = requests.get(url) | |||||
archive_path.parent.mkdir(parents=True, exist_ok=True) | |||||
archive_path.write_bytes(res.content) | |||||
extract_to = Path(str(archive_path).split(".tar.gz")[0]) | |||||
tar = tarfile.open(archive_path) | |||||
tar.extractall(path=extract_to) | |||||
tar.close() | |||||
return extract_to | |||||
def parse_desc_file(self, path: Path, repo: str) -> Dict[str, Any]: | |||||
"""Extract package information from a 'desc' file. | |||||
Args: | |||||
path: A path to a 'desc' file on disk | |||||
repo: The repo the package belongs to | |||||
Returns: | |||||
A dict of metadata | |||||
Example:: | |||||
{'api_url': 'https://archlinux.org/packages/core/x86_64/dialog/json', | |||||
'arch': 'x86_64', | |||||
'base': 'dialog', | |||||
'builddate': '1650081535', | |||||
'csize': '203028', | |||||
'desc': 'A tool to display dialog boxes from shell scripts', | |||||
'filename': 'dialog-1:1.3_20220414-1-x86_64.pkg.tar.zst', | |||||
'isize': '483988', | |||||
'license': 'LGPL2.1', | |||||
'md5sum': '06407c0cb11c50d7bf83d600f2e8107c', | |||||
'name': 'dialog', | |||||
'packager': 'Evangelos Foutras <foutrelis@archlinux.org>', | |||||
'pgpsig': 'pgpsig content xxx', | |||||
'project_url': 'https://invisible-island.net/dialog/', | |||||
'provides': 'libdialog.so=15-64', | |||||
'repo': 'core', | |||||
'sha256sum': 'ef8c8971f591de7db0f455970ef5d81d5aced1ddf139f963f16f6730b1851fa7', | |||||
'url': 'https://archive.archlinux.org/packages/.all/dialog-1:1.3_20220414-1-x86_64.pkg.tar.zst', # noqa: B950 | |||||
'version': '1:1.3_20220414-1'} | |||||
""" | |||||
rex = re.compile(r"^\%(?P<k>\w+)\%\n(?P<v>.*)\n$", re.M) | |||||
with path.open("rb") as content: | |||||
parsed = rex.findall(content.read().decode()) | |||||
data = {entry[0].lower(): entry[1] for entry in parsed} | |||||
data["project_url"] = data["url"] | |||||
data["url"] = urljoin(self.url, f"/packages/.all/{data['filename']}") | |||||
data["repo"] = repo | |||||
data["api_url"] = self.ARCH_API_URL_PATTERN.format( | |||||
repo=repo, arch=data["arch"], pkgname=data["name"] | |||||
) | |||||
return data | |||||
def get_pages(self) -> Iterator[ArchListerPage]: | |||||
"""Yield an iterator sorted by name in ascending order of pages. | |||||
Each page is a list of package belonging to a repo (one of self.repos) | |||||
""" | |||||
for repo in self.repos: | |||||
page = [] | |||||
archive = self.get_repo_archive(repo=repo) | |||||
packages_desc = list(archive.glob("**/desc")) | |||||
logger.debug( | |||||
f"Processing {self.instance} source packages info from " | |||||
f"{repo} repository. ({len(packages_desc)} packages)" | |||||
) | |||||
for package_desc in packages_desc: | |||||
data = self.parse_desc_file(package_desc, repo) | |||||
data["checksums"] = { | |||||
"md5sum": hash_to_hex(data["md5sum"]), | |||||
"sha256sum": hash_to_hex(data["sha256sum"]), | |||||
} | |||||
last_modified = iso8601.parse_date( | |||||
datetime.datetime.fromtimestamp(int(data["builddate"])).isoformat() | |||||
) | |||||
versions = self.scrap_package_versions(name=data["name"], repo=repo) | |||||
package = { | |||||
"name": data["name"], | |||||
"version": data["version"], | |||||
"last_modified": last_modified, | |||||
"url": self.ARCH_PACKAGE_URL_PATTERN.format(name=data["name"]), | |||||
"versions": versions, | |||||
"data": data, | |||||
} | |||||
page.append(package) | |||||
yield page | |||||
def get_origins_from_page(self, page: ArchListerPage) -> Iterator[ListedOrigin]: | |||||
"""Iterate on all arch pages and yield ListedOrigin instances.""" | |||||
assert self.lister_obj.id is not None | |||||
for origin in page: | |||||
yield ListedOrigin( | |||||
lister_id=self.lister_obj.id, | |||||
visit_type=self.VISIT_TYPE, | |||||
url=origin["url"], | |||||
last_update=origin["last_modified"], | |||||
extra_loader_arguments={ | |||||
"artifacts": origin["versions"], | |||||
}, | |||||
) |