Changeset View
Changeset View
Standalone View
Standalone View
swh/lister/opam/lister.py
# Copyright (C) 2021 The Software Heritage developers | # Copyright (C) 2021 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import io | import io | ||||
import logging | import logging | ||||
import os | import os | ||||
from subprocess import PIPE, Popen, call | from subprocess import PIPE, Popen, call | ||||
import tempfile | from typing import Any, Dict, Iterator, Optional | ||||
from typing import Iterator | |||||
from swh.lister.pattern import StatelessLister | from swh.lister.pattern import StatelessLister | ||||
from swh.scheduler.interface import SchedulerInterface | from swh.scheduler.interface import SchedulerInterface | ||||
from swh.scheduler.model import ListedOrigin | from swh.scheduler.model import ListedOrigin | ||||
from ..pattern import CredentialsType | from ..pattern import CredentialsType | ||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||
Show All 19 Lines | class OpamLister(StatelessLister[PageType]): | ||||
# Part of the lister API, that identifies this lister | # Part of the lister API, that identifies this lister | ||||
LISTER_NAME = "opam" | LISTER_NAME = "opam" | ||||
def __init__( | def __init__( | ||||
self, | self, | ||||
scheduler: SchedulerInterface, | scheduler: SchedulerInterface, | ||||
url: str, | url: str, | ||||
instance: str, | instance: Optional[str] = None, | ||||
credentials: CredentialsType = None, | credentials: CredentialsType = None, | ||||
opam_root: str = "/tmp/opam/", | |||||
): | ): | ||||
super().__init__( | super().__init__( | ||||
scheduler=scheduler, credentials=credentials, url=url, instance=instance, | scheduler=scheduler, credentials=credentials, url=url, instance=instance, | ||||
) | ) | ||||
self.env = os.environ.copy() | self.env = os.environ.copy() | ||||
self.opamroot = tempfile.mkdtemp(prefix="swh_opam_lister") | # Opam root folder is initialized in the :meth:`get_pages` method as no | ||||
call( | # side-effect should happen in the constructor to ease instantiation | ||||
[ | self.opam_root = opam_root | ||||
"opam", | |||||
"init", | |||||
"--reinit", | |||||
"--bare", | |||||
"--no-setup", | |||||
"--root", | |||||
self.opamroot, | |||||
instance, | |||||
url, | |||||
], | |||||
env=self.env, | |||||
) | |||||
def get_pages(self) -> Iterator[PageType]: | def get_pages(self) -> Iterator[PageType]: | ||||
# Initialize the opam root directory | |||||
opam_init(self.opam_root, self.instance, self.url, self.env) | |||||
# Actually list opam instance data | |||||
proc = Popen( | proc = Popen( | ||||
[ | [ | ||||
"opam", | "opam", | ||||
"list", | "list", | ||||
"--all", | "--all", | ||||
"--no-switch", | "--no-switch", | ||||
"--safe", | |||||
"--repos", | "--repos", | ||||
self.instance, | self.instance, | ||||
"--root", | "--root", | ||||
self.opamroot, | self.opam_root, | ||||
"--normalise", | "--normalise", | ||||
"--short", | "--short", | ||||
], | ], | ||||
env=self.env, | env=self.env, | ||||
stdout=PIPE, | stdout=PIPE, | ||||
) | ) | ||||
if proc.stdout is not None: | if proc.stdout is not None: | ||||
for line in io.TextIOWrapper(proc.stdout): | for line in io.TextIOWrapper(proc.stdout): | ||||
yield line.rstrip("\n") | yield line.rstrip("\n") | ||||
def get_origins_from_page(self, page: PageType) -> Iterator[ListedOrigin]: | def get_origins_from_page(self, page: PageType) -> Iterator[ListedOrigin]: | ||||
"""Convert a page of OpamLister repositories into a list of ListedOrigins""" | """Convert a page of OpamLister repositories into a list of ListedOrigins""" | ||||
assert self.lister_obj.id is not None | assert self.lister_obj.id is not None | ||||
# a page is just a package name | # a page is just a package name | ||||
url = f"opam+{self.url}/packages/{page}/" | url = f"opam+{self.url}/packages/{page}/" | ||||
yield ListedOrigin( | yield ListedOrigin( | ||||
lister_id=self.lister_obj.id, | lister_id=self.lister_obj.id, | ||||
visit_type="opam", | visit_type="opam", | ||||
url=url, | url=url, | ||||
last_update=None, | last_update=None, | ||||
extra_loader_arguments={ | extra_loader_arguments={ | ||||
"opam_root": self.opamroot, | "opam_root": self.opam_root, | ||||
"opam_instance": self.instance, | "opam_instance": self.instance, | ||||
"opam_url": self.url, | "opam_url": self.url, | ||||
"opam_package": page, | "opam_package": page, | ||||
}, | }, | ||||
) | ) | ||||
def opam_init(opam_root: str, instance: str, url: str, env: Dict[str, Any]) -> None: | |||||
"""Initialize an opam_root folder. | |||||
Args: | |||||
opam_root: The opam root folder to initialize | |||||
instance: Name of the opam repository to add or initialize | |||||
url: The associated url of the opam repository to add or initialize | |||||
env: The global environment to use for the opam command. | |||||
Returns: | |||||
None. | |||||
""" | |||||
if not os.path.exists(opam_root) or not os.listdir(opam_root): | |||||
command = [ | |||||
"opam", | |||||
"init", | |||||
"--reinit", | |||||
"--bare", | |||||
"--no-setup", | |||||
"--root", | |||||
opam_root, | |||||
instance, | |||||
url, | |||||
] | |||||
else: | |||||
# The repository exists and is populated, we just add another instance in the | |||||
# repository. If it's already setup, it's a noop | |||||
command = [ | |||||
"opam", | |||||
"repository", | |||||
"add", | |||||
"--root", | |||||
opam_root, | |||||
instance, | |||||
url, | |||||
] | |||||
# Actually execute the command | |||||
call(command, env=env) |