Changeset View
Changeset View
Standalone View
Standalone View
swh/lister/arch/lister.py
- This file was added.
# Copyright (C) 2022 The Software Heritage developers | |||||
# See the AUTHORS file at the top-level directory of this distribution | |||||
# License: GNU General Public License version 3, or any later version | |||||
# See top-level LICENSE file for more information | |||||
import json | |||||
import logging | |||||
from pathlib import Path | |||||
import re | |||||
import shlex | |||||
import subprocess | |||||
from typing import Any, Dict, Iterator, List | |||||
from urllib.parse import urlparse | |||||
import iso8601 | |||||
from swh.scheduler.interface import SchedulerInterface | |||||
from swh.scheduler.model import ListedOrigin | |||||
from ..pattern import CredentialsType, StatelessLister | |||||
logger = logging.getLogger(__name__) | |||||
# Aliasing the page results returned by `get_pages` method from the lister. | |||||
ArchListerPage = List[Dict[str, Any]] | |||||
def pkgbuild_parser(content: str) -> Dict[str, Any]: | |||||
"""Given the content of a PKGBUILD file (Arch Linux package build description | |||||
file), parse it to extract relevant data. | |||||
Options and directives of PKGBUILD file: | |||||
* `PKGBUILD`_ | |||||
* `wiki page`_ | |||||
Returns: | |||||
Dict | |||||
.. _PKGBUILD: https://archlinux.org/pacman/PKGBUILD.5.html | |||||
.. _wiki page: https://wiki.archlinux.org/title/PKGBUILD | |||||
""" | |||||
# Authors line are prefixed with # | |||||
authors_mapping = { | |||||
"maintainer": "# Maintainer: ", | |||||
"contributor": "# Contributor: ", | |||||
} | |||||
# Corresponding values for those keys are string | |||||
str_mapping = [ | |||||
"pkgname", | |||||
"pkgver", | |||||
"pkgrel", | |||||
"pkgdesc", | |||||
"url", | |||||
] | |||||
# Corresponding values for those keys are list | |||||
list_mapping = [ | |||||
"arch", | |||||
"license", | |||||
"source", | |||||
"cksums", | |||||
"md5sums", | |||||
"sha1sums", | |||||
"sha224sums", | |||||
"sha256sums", | |||||
"sha384sums", | |||||
"sha512sums", | |||||
"b2sums", | |||||
] | |||||
pkg: Dict = {} | |||||
# For each mapping iterate over to match content | |||||
for k, v in authors_mapping.items(): | |||||
AUTHORS_RE = re.compile(rf"{v}\s*(.*$)", re.MULTILINE) | |||||
pkg[k] = AUTHORS_RE.findall(content) | |||||
for k in str_mapping: | |||||
SINGLE_RE = re.compile(rf"(?<={k}=)(.+)", re.M) | |||||
single = SINGLE_RE.findall(content) | |||||
# cleanup the result from enclosing single or double quotes | |||||
res = single[0] | |||||
res = res.strip('"').strip("'") | |||||
vlorentz: it would also remove both if a string is meant to contain quotes. use `re.sub("""["'](.*)… | |||||
pkg[k] = res | |||||
for k in list_mapping: | |||||
MULTIPLE_RE = re.compile(rf"{k}+\s*=\s*\(([^)]*)\)", re.M) | |||||
multiple = MULTIPLE_RE.findall(content) | |||||
res = [] | |||||
for part in multiple: | |||||
for entry in shlex.split(part): | |||||
res.append(entry) | |||||
pkg[k] = res | |||||
# Sometime a value can be expressed as a bash variable ${var}. | |||||
# Find and add them to str_mapping | |||||
pkg_str = json.dumps(pkg) | |||||
VAR_RE = re.compile(r"\${(.\w+)", re.MULTILINE) | |||||
var_mapping = VAR_RE.findall(pkg_str) | |||||
if len(var_mapping) > 0: | |||||
# Resolve bash variables | |||||
data = dict() | |||||
for k in var_mapping: | |||||
if k in pkg.keys(): | |||||
data["${" + k + "}"] = pkg[k] | |||||
else: | |||||
# TODO: catch the value in original content | |||||
logger.debug(f"Can not find a value for {k} in: {pkg}") | |||||
pattern = re.compile("|".join(re.escape(key) for key in data.keys())) | |||||
result = json.loads(pattern.sub(lambda x: data[x.group()], json.dumps(pkg))) | |||||
# expand multiple source entries to a list in order to be able to | |||||
# associate corresponding checksums | |||||
res = [] | |||||
for e in result["source"]: | |||||
sp = re.compile("{(,.*)}").split(e) | |||||
if len(sp) > 1: | |||||
res.append(sp[0]) | |||||
for ext in sp[1].split(","): | |||||
if ext and ext.startswith("."): | |||||
res.append(f"{sp[0]}{ext}") | |||||
else: | |||||
res.append(e) | |||||
pkg["source"] = res | |||||
return pkg | |||||
def pkgbuild_get_versions( | |||||
repository_path: Path, pkgbuild_path: Path | |||||
) -> List[Dict[str, Any]]: | |||||
"""Retrieve all previous versions of an Arch Linux package. | |||||
Note that Arch Linux strives to maintain the latest stable release | |||||
versions of its software. The git repository listing PKGBUILD files do not | |||||
have an explicit list of previous released versions of a package, just the | |||||
latest one. | |||||
To be able to list all previous existing versions we need to introspect the | |||||
history of the PKGBUILD file through git log to git patch command. | |||||
""" | |||||
cmd = ( | |||||
rf"git log --pretty='+date=%cI' -p -L '^/pkgver=.*/,+1:{pkgbuild_path}'" | |||||
rf" | grep '+pkgver\|+date'" | |||||
) | |||||
vlorentzUnsubmitted Not Done Inline Actionsshell injection: pkgbuild_path is not trusted and neither escaped nor validated. try Dulwich, it's more reliable than parsing Git's output anyway. vlorentz: shell injection: `pkgbuild_path` is not trusted and neither escaped nor validated.
try Dulwich… | |||||
raw = subprocess.check_output(cmd, cwd=repository_path, shell=True) | |||||
rex = re.compile( | |||||
r"(\+date)=(?P<date>.*)\n(\+pkgver)=(?P<pkgver>[\d\.]+)[\n\\\\n]", | |||||
re.MULTILINE, | |||||
) | |||||
res = [m.groupdict() for m in rex.finditer(raw.decode())] | |||||
return res | |||||
class ArchLister(StatelessLister[ArchListerPage]): | |||||
"""List Arch linux origins from a git repository. | |||||
It basically fetches https://github.com/archlinux/svntogit-packages.git to a | |||||
temp directory and then walks through each PKGBUILD files. | |||||
""" | |||||
LISTER_NAME = "arch" | |||||
VISIT_TYPE = "arch" | |||||
INSTANCE = "arch" | |||||
INDEX_REPOSITORY_URL = "https://github.com/archlinux/svntogit-packages.git" | |||||
DESTINATION_PATH = Path("/tmp/svntogit-packages.git") | |||||
ARCH_API_URL_PATTERN = "https://archlinux.org/packages/{repo}/{arch}/{pkgname}/json" | |||||
def __init__( | |||||
self, | |||||
scheduler: SchedulerInterface, | |||||
credentials: CredentialsType = None, | |||||
): | |||||
super().__init__( | |||||
scheduler=scheduler, | |||||
credentials=credentials, | |||||
url=self.INDEX_REPOSITORY_URL, | |||||
instance=self.INSTANCE, | |||||
) | |||||
def get_index_repository(self) -> None: | |||||
"""Get arch.io-index repository up to date running git command.""" | |||||
if self.DESTINATION_PATH.exists(): | |||||
subprocess.check_call( | |||||
[ | |||||
"git", | |||||
"pull", | |||||
"--rebase", | |||||
self.INDEX_REPOSITORY_URL, | |||||
], | |||||
cwd=self.DESTINATION_PATH, | |||||
) | |||||
else: | |||||
subprocess.check_call( | |||||
[ | |||||
"git", | |||||
"clone", | |||||
self.INDEX_REPOSITORY_URL, | |||||
self.DESTINATION_PATH, | |||||
] | |||||
) | |||||
def get_arch_index(self) -> List[Path]: | |||||
"""Build a sorted list of file paths excluding dotted directories and | |||||
dotted files. | |||||
Each file path corresponds to a PKGBUILD file that contains information | |||||
about a Arch linux official package referenced as 'core', 'extra' or | |||||
'community' repository. | |||||
https://wiki.archlinux.org/title/Official_repositories | |||||
""" | |||||
arch_index = sorted( | |||||
path | |||||
for path in self.DESTINATION_PATH.rglob("*") | |||||
if not any(part.startswith(".") for part in path.parts) | |||||
and path.is_file() | |||||
and path.name == "PKGBUILD" | |||||
and ( | |||||
"core" in path.parent.name | |||||
or "extra" in path.parent.name | |||||
or "community" in path.parent.name | |||||
vlorentzUnsubmitted Not Done Inline Actionswhy in? vlorentz: why `in`? | |||||
) | |||||
) | |||||
return arch_index | |||||
def get_pages(self) -> Iterator[ArchListerPage]: | |||||
"""Yield an iterator sorted by name in ascending order of pages. | |||||
Each page is a list of Arch Linux package versions with: | |||||
- name: Name of the package | |||||
- version: Version | |||||
- checksum: Checksum if any | |||||
- pkg_file: Url to download the related package | |||||
- last_update: Date of the last commit of the corresponding index | |||||
file | |||||
""" | |||||
# Fetch arch.io index repository | |||||
self.get_index_repository() | |||||
# Get a list of all arch files from the index repository | |||||
arch_index = self.get_arch_index() | |||||
logger.debug("Found %s PKGBUILD files in arch_index", len(arch_index)) | |||||
for arch in arch_index: | |||||
page = [] | |||||
with arch.open("rb") as current_file: | |||||
pkg = pkgbuild_parser(content=current_file.read().decode()) | |||||
versions = pkgbuild_get_versions( | |||||
repository_path=self.DESTINATION_PATH, pkgbuild_path=arch | |||||
) | |||||
for version in versions: | |||||
page.append( | |||||
dict( | |||||
pkgname=pkg["pkgname"], | |||||
pkgver=version["pkgver"], | |||||
arch=pkg["arch"][0], # TODO: There can be more than arch | |||||
repo=arch.parent.name.split("-")[0], | |||||
pkg=pkg["source"][ | |||||
0 | |||||
], # TODO: The can be more than one source | |||||
vlorentzUnsubmitted Not Done Inline Actionspass the list of archs and source packages; the loader can deal with them by creating them as different releases in the same origin (kind of like this: https://forge.softwareheritage.org/source/swh-loader-core/browse/master/swh/loader/package/pypi/loader.py$121-126 ) vlorentz: pass the list of archs and source packages; the loader can deal with them by creating them as… | |||||
last_update=version["date"].strip(), | |||||
project_url=pkg["url"], | |||||
raw=pkg, | |||||
) | |||||
) | |||||
yield page | |||||
def get_origins_from_page(self, page: ArchListerPage) -> Iterator[ListedOrigin]: | |||||
"""Iterate on all arch pages and yield ListedOrigin instances.""" | |||||
assert self.lister_obj.id is not None | |||||
url = self.ARCH_API_URL_PATTERN.format( | |||||
repo=page[0]["repo"], arch=page[0]["arch"], pkgname=page[0]["pkgname"] | |||||
) | |||||
last_update = iso8601.parse_date(page[0]["last_update"]) | |||||
artifacts = [] | |||||
for version in page: | |||||
filename = urlparse(version["pkg"]).path.split("/")[-1] | |||||
# Build an artifact entry following original-artifacts-json specification | |||||
# https://docs.softwareheritage.org/devel/swh-storage/extrinsic-metadata-specification.html#original-artifacts-json # noqa: B950 | |||||
artifact = { | |||||
"filename": f"{filename}", | |||||
"url": version["pkg"], | |||||
"version": version["pkgver"], | |||||
} | |||||
artifact.update(version) | |||||
artifacts.append(artifact) | |||||
yield ListedOrigin( | |||||
lister_id=self.lister_obj.id, | |||||
visit_type=self.VISIT_TYPE, | |||||
url=url, | |||||
last_update=last_update, | |||||
extra_loader_arguments={ | |||||
"artifacts": artifacts, | |||||
}, | |||||
) |
it would also remove both if a string is meant to contain quotes. use re.sub("""["'](.*)["']""", ...)