Changeset View
Changeset View
Standalone View
Standalone View
swh/fuse/tests/conftest.py
Show All 10 Lines | |||||
import time | import time | ||||
from click.testing import CliRunner | from click.testing import CliRunner | ||||
import pytest | import pytest | ||||
import yaml | import yaml | ||||
from swh.fuse import cli | from swh.fuse import cli | ||||
from .api_data import API_URL, MOCK_ARCHIVE, ROOT_SWHID | from .api_data import API_URL, MOCK_ARCHIVE, ROOTDIR_SWHID, ROOTREV_SWHID | ||||
@pytest.fixture | @pytest.fixture | ||||
def web_api_mock(requests_mock): | def web_api_mock(requests_mock): | ||||
for api_call, data in MOCK_ARCHIVE.items(): | for api_call, data in MOCK_ARCHIVE.items(): | ||||
requests_mock.get(f"{API_URL}/{api_call}", text=data) | requests_mock.get(f"{API_URL}/{api_call}", text=data) | ||||
return requests_mock | return requests_mock | ||||
Show All 11 Lines | def fuse_mntdir(web_api_mock): | ||||
# Run FUSE in foreground mode but in a separate process, so it does not | # Run FUSE in foreground mode but in a separate process, so it does not | ||||
# block execution and remains easy to kill during teardown | # block execution and remains easy to kill during teardown | ||||
def fuse_process(tmpdir, tmpfile): | def fuse_process(tmpdir, tmpfile): | ||||
with tmpdir as mntdir, tmpfile as config_path: | with tmpdir as mntdir, tmpfile as config_path: | ||||
config_path = Path(config_path.name) | config_path = Path(config_path.name) | ||||
config_path.write_text(yaml.dump(config)) | config_path.write_text(yaml.dump(config)) | ||||
CliRunner().invoke( | CliRunner().invoke( | ||||
cli.mount, | cli.mount, | ||||
args=[mntdir, ROOT_SWHID, "--foreground", "--config-file", config_path], | args=[ | ||||
mntdir, | |||||
ROOTDIR_SWHID, | |||||
ROOTREV_SWHID, | |||||
"--foreground", | |||||
"--config-file", | |||||
config_path, | |||||
], | |||||
) | ) | ||||
fuse = Process(target=fuse_process, args=[tmpdir, tmpfile]) | fuse = Process(target=fuse_process, args=[tmpdir, tmpfile]) | ||||
fuse.start() | fuse.start() | ||||
# Wait max 3 seconds for the FUSE to correctly mount | # Wait max 3 seconds for the FUSE to correctly mount | ||||
for i in range(30): | for i in range(30): | ||||
try: | try: | ||||
root = listdir(tmpdir.name) | root = listdir(tmpdir.name) | ||||
Show All 12 Lines |