diff --git a/swh/loader/package/opam/loader.py b/swh/loader/package/opam/loader.py --- a/swh/loader/package/opam/loader.py +++ b/swh/loader/package/opam/loader.py @@ -9,8 +9,10 @@ from typing import Iterator, List, Optional, Tuple import attr +from debian.deb822 import Dsc from swh.loader.package.loader import BasePackageInfo, PackageLoader +from swh.loader.package.utils import cached_method from swh.model.model import Person, Revision, RevisionType, Sha1Git from swh.storage.interface import StorageInterface @@ -84,6 +86,41 @@ self.opam_url = opam_url self.opam_package = opam_package + def get_package_dir(self) -> str: + return ( + f"{self.opam_root}/repo/{self.opam_instance}/packages/{self.opam_package}" + ) + + def get_package_file(self, version: str) -> str: + return f"{self.get_package_dir()}/{self.opam_package}.{version}/opam" + + @cached_method + def _compute_versions(self) -> List[str]: + """Compute the versions using opam internals + + Raises: + ValueError in case the lister is not able to determine the list of versions + + Returns: + The list of versions for the package + + + """ + # HACK using opam internals (opam < 2.1) to list current package versions. We + # need `opam show` to support the --repo flag but it does not currently so we + # work around it. + package_dir = self.get_package_dir() + if not os.path.exists(package_dir): + raise ValueError( + f"can't get versions for package {self.opam_package} " + f"(at url {self.url}) from `opam show`" + ) + versions = [ + ".".join(version.split(".")[1:]) for version in os.listdir(package_dir) + ] + versions.sort() + return versions + def get_versions(self) -> List[str]: """First initialize the opam root directory if needed the start listing the package versions. @@ -109,77 +146,60 @@ elif not os.path.isfile(os.path.join(self.opam_root, "config")): raise ValueError("invalid opam root") - versions = opam_read( - [ - "opam", - "show", - "--color", - "never", - "--normalise", - "--root", - self.opam_root, - "-f", - "all-versions", - self.opam_package, - ], - init_error_msg_if_any=( + return self._compute_versions() + + def get_default_version(self) -> str: + versions = self._compute_versions() + if not versions: + raise ValueError( f"can't get versions for package {self.opam_package} " f"(at url {self.url}) from `opam show`" - ), - ) - return versions.split() if versions else [] + ) + return versions[0] - def get_default_version(self) -> str: + def _get_metadata(self, version): + """Retrieve internal package metadata for a give version and parse the result. - init_error_msg = f"can't get default version for package {self.opam_package} \ - (at url {self.url}) from `opam show`" - # we only care about the first element of the first line - # which is the initial version - versions_ = opam_read( - [ - "opam", - "show", - "--color", - "never", - "--normalise", - "--root", - self.opam_root, - "-f", - "version", - self.opam_package, - ], - init_error_msg_if_any=init_error_msg, - ) - if not versions_: - raise ValueError(init_error_msg) - versions = versions_.split() - if len(versions) != 1: - raise ValueError(init_error_msg) - return versions[0] + """ + package_file = self.get_package_file(version) # that file is a debian like one + with open(package_file, "rb") as dsc: + parsed_dsc = Dsc(dsc) + return parsed_dsc def get_enclosed_single_line_field(self, field, version) -> Optional[str]: - result = opam_read( - [ - "opam", - "show", - "--color", - "never", - "--normalise", - "--root", - self.opam_root, - "-f", - field, - f"{self.opam_package}.{version}", - ] - ) + """Retrieve metadata information for the 'version'ned package using the 'field' + key. - # this needs to be cleaned up a bit (remove enclosing " and the trailing \n) - return result[1:-2] if result else None + """ + if field == "url.src": + # That field is formatted differently from the remaining part of the file + # It's somewhat looking like a python dict as the last part of the file + # ``` + # ... + # url { + # src: "https://github.com/OCamlPro/agrid/archive/0.1.tar.gz" + # checksum: [ + # "sha256=ea82546711a6abdd4edf8bc3052041498cae9c2e5a9e147e29820da4e..." + # "sha512=f53b2c095e3607e53f92d4e7e13848e9e34bd866837335e7d9341dbb4..." + # ] + # } + # ``` + package_file = self.get_package_file(version) + with open(package_file, "r") as f: + src = [line for line in f if "src: " in line] + assert len(src) == 1 + header, url = src[0].split() + assert header == "src:" + return url.strip('"') # result is '"quoted"' + assert field in ["authors", "maintainer"] + opam_metadata = self._get_metadata(version) + result = eval(opam_metadata[field]) # It's read as a list enclosed in a string + assert isinstance(result, list) + return "".join(result) def get_package_info(self, version: str) -> Iterator[Tuple[str, OpamPackageInfo]]: - branch_name = f"{self.opam_package}.{version}" - url = self.get_enclosed_single_line_field("url.src:", version) + url = self.get_enclosed_single_line_field("url.src", version) if url is None: raise ValueError( @@ -187,11 +207,11 @@ (at url {self.url}) from `opam show`" ) - authors_field = self.get_enclosed_single_line_field("authors:", version) + authors_field = self.get_enclosed_single_line_field("authors", version) fullname = b"" if authors_field is None else str.encode(authors_field) author = Person(fullname=fullname, name=None, email=None) - maintainer_field = self.get_enclosed_single_line_field("maintainer:", version) + maintainer_field = self.get_enclosed_single_line_field("maintainer", version) fullname = b"" if maintainer_field is None else str.encode(maintainer_field) committer = Person(fullname=fullname, name=None, email=None)