diff --git a/swh/loader/mercurial/loader.py b/swh/loader/mercurial/loader.py --- a/swh/loader/mercurial/loader.py +++ b/swh/loader/mercurial/loader.py @@ -19,9 +19,12 @@ import datetime import hglib +import multiprocessing import os +from queue import Empty import random import re +import time from dateutil import parser from shutil import rmtree @@ -50,6 +53,10 @@ HEAD_POINTER_NAME = b'tip' +class CloneTimeoutError(Exception): + pass + + class HgBundle20Loader(UnbufferedLoader): """Mercurial loader able to deal with remote or local repository. @@ -62,6 +69,7 @@ 'temp_directory': ('str', '/tmp'), 'cache1_size': ('int', 800*1024*1024), 'cache2_size': ('int', 800*1024*1024), + 'clone_timeout_seconds': ('int', 7200), } visit_type = 'hg' @@ -75,6 +83,7 @@ self.temp_directory = self.config['temp_directory'] self.cache1_size = self.config['cache1_size'] self.cache2_size = self.config['cache2_size'] + self.clone_timeout = self.config['clone_timeout_seconds'] self.working_directory = None self.bundle_path = None @@ -129,6 +138,43 @@ self.last_visit = self.storage.origin_visit_get_latest( self.origin['url'], require_snapshot=True) + @staticmethod + def clone_with_timeout(log, origin, destination, timeout): + queue = multiprocessing.Queue() + start = time.monotonic() + + def do_clone(queue, origin, destination): + try: + result = hglib.clone(source=origin, dest=destination) + except BaseException as e: + queue.put(e) + else: + queue.put(result) + + process = multiprocessing.Process(target=do_clone, + args=(queue, origin, destination)) + process.start() + + while True: + try: + result = queue.get(timeout=0.1) + break + except Empty: + duration = time.monotonic() - start + if timeout and duration > timeout: + log.warning('Timeout cloning `%s` within %s seconds', + origin, timeout) + process.terminate() + process.join() + raise CloneTimeoutError(origin, timeout) + continue + + process.join() + if isinstance(result, Exception): + raise result from None + + return result + def prepare(self, *, origin_url, visit_date, directory=None): """Prepare the necessary steps to load an actual remote or local repository. @@ -158,9 +204,12 @@ os.makedirs(self.working_directory, exist_ok=True) self.hgdir = self.working_directory - self.log.debug('Cloning %s to %s' % ( - self.origin['url'], self.hgdir)) - hglib.clone(source=self.origin['url'], dest=self.hgdir) + self.log.debug('Cloning %s to %s with timeout %s seconds', + self.origin['url'], self.hgdir, self.clone_timeout) + + self.clone_with_timeout(self.log, self.origin['url'], self.hgdir, + self.clone_timeout) + else: # local repository self.working_directory = None self.hgdir = directory diff --git a/swh/loader/mercurial/tests/common.py b/swh/loader/mercurial/tests/common.py --- a/swh/loader/mercurial/tests/common.py +++ b/swh/loader/mercurial/tests/common.py @@ -16,6 +16,7 @@ 'content_packet_size_bytes': 1073741824, 'content_size_limit': 104857600, 'directory_packet_size': 25000, + 'clone_timeout_seconds': 2 * 3600, 'log_db': 'dbname=softwareheritage-log', 'occurrence_packet_size': 100000, 'reduce_effort': False, diff --git a/swh/loader/mercurial/tests/test_loader.py b/swh/loader/mercurial/tests/test_loader.py --- a/swh/loader/mercurial/tests/test_loader.py +++ b/swh/loader/mercurial/tests/test_loader.py @@ -3,14 +3,20 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import logging import os +import time from unittest.mock import patch +import hglib +import pytest + from swh.loader.core.tests import BaseLoaderTest from swh.storage.algos.snapshot import snapshot_get_all_branches from .common import HgLoaderMemoryStorage, HgArchiveLoaderMemoryStorage +from ..loader import HgBundle20Loader, CloneTimeoutError class BaseHgLoaderTest(BaseLoaderTest): @@ -311,3 +317,52 @@ self.assertTrue(len(hg_changesets) > 0) self.assertTrue(len(transplant_sources) > 0) self.assertTrue(transplant_sources.issubset(hg_changesets)) + + +def test_clone_with_timeout_timeout(caplog, tmp_path, monkeypatch): + log = logging.getLogger('test_clone_with_timeout') + + def clone_timeout(source, dest): + time.sleep(60) + + monkeypatch.setattr(hglib, "clone", clone_timeout) + + with pytest.raises(CloneTimeoutError): + HgBundle20Loader.clone_with_timeout( + log, 'https://www.mercurial-scm.org/repo/hello', tmp_path, 1 + ) + + for record in caplog.records: + assert record.levelname == 'WARNING' + assert ( + 'https://www.mercurial-scm.org/repo/hello' in record.getMessage() + ) + assert record.args == ('https://www.mercurial-scm.org/repo/hello', 1) + + +def test_clone_with_timeout_returns(caplog, tmp_path, monkeypatch): + log = logging.getLogger('test_clone_with_timeout') + + def clone_return(source, dest): + return (source, dest) + + monkeypatch.setattr(hglib, "clone", clone_return) + + assert HgBundle20Loader.clone_with_timeout( + log, 'https://www.mercurial-scm.org/repo/hello', tmp_path, 1 + ) == ('https://www.mercurial-scm.org/repo/hello', tmp_path) + + +def test_clone_with_timeout_exception(caplog, tmp_path, monkeypatch): + log = logging.getLogger('test_clone_with_timeout') + + def clone_return(source, dest): + raise ValueError('Test exception') + + monkeypatch.setattr(hglib, "clone", clone_return) + + with pytest.raises(ValueError) as excinfo: + HgBundle20Loader.clone_with_timeout( + log, 'https://www.mercurial-scm.org/repo/hello', tmp_path, 1 + ) + assert 'Test exception' in excinfo.value.args[0]