Changeset View
Changeset View
Standalone View
Standalone View
swh/lister/opam/lister.py
# Copyright (C) 2021 The Software Heritage developers | # Copyright (C) 2021 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import io | import io | ||||
import logging | import logging | ||||
import os | import os | ||||
from subprocess import PIPE, Popen, call | from subprocess import PIPE, Popen, call | ||||
import tempfile | |||||
from typing import Iterator, Optional | from typing import Iterator, Optional | ||||
from swh.lister.pattern import StatelessLister | from swh.lister.pattern import StatelessLister | ||||
from swh.scheduler.interface import SchedulerInterface | from swh.scheduler.interface import SchedulerInterface | ||||
from swh.scheduler.model import ListedOrigin | from swh.scheduler.model import ListedOrigin | ||||
from ..pattern import CredentialsType | from ..pattern import CredentialsType | ||||
Show All 22 Lines | class OpamLister(StatelessLister[PageType]): | ||||
LISTER_NAME = "opam" | LISTER_NAME = "opam" | ||||
def __init__( | def __init__( | ||||
self, | self, | ||||
scheduler: SchedulerInterface, | scheduler: SchedulerInterface, | ||||
url: str, | url: str, | ||||
instance: Optional[str] = None, | instance: Optional[str] = None, | ||||
credentials: CredentialsType = None, | credentials: CredentialsType = None, | ||||
opam_root: str = "/tmp/opam/", | |||||
): | ): | ||||
super().__init__( | super().__init__( | ||||
scheduler=scheduler, credentials=credentials, url=url, instance=instance, | scheduler=scheduler, credentials=credentials, url=url, instance=instance, | ||||
) | ) | ||||
self.env = os.environ.copy() | self.env = os.environ.copy() | ||||
# Opam root folder is initialized in the :meth:`get_pages` method as no | # Opam root folder is initialized in the :meth:`get_pages` method as no | ||||
# side-effect should happen in the constructor to ease instantiation | # side-effect should happen in the constructor to ease instantiation | ||||
self.opamroot = tempfile.mkdtemp(prefix="swh_opam_lister") | self.opamroot = os.path.join(opam_root, self.instance) | ||||
def get_pages(self) -> Iterator[PageType]: | def get_pages(self) -> Iterator[PageType]: | ||||
# Initialize the opam root directory with the opam instance data to list. | # Initialize the opam root directory with the opam instance data to list. | ||||
call( | call( | ||||
[ | [ | ||||
"opam", | "opam", | ||||
"init", | "init", | ||||
"--reinit", | "--reinit", | ||||
▲ Show 20 Lines • Show All 47 Lines • Show Last 20 Lines |