diff --git a/swh/loader/mercurial/hgutil.py b/swh/loader/mercurial/hgutil.py --- a/swh/loader/mercurial/hgutil.py +++ b/swh/loader/mercurial/hgutil.py @@ -5,19 +5,15 @@ from collections import defaultdict from dataclasses import dataclass -import io -import os -import signal -import time -import traceback +from functools import partial from typing import Dict, List, Mapping, NewType, Optional, Set -from billiard import Process, Queue - # The internal Mercurial API is not guaranteed to be stable. from mercurial import bookmarks, context, error, hg, smartset, util # type: ignore import mercurial.ui # type: ignore +from swh.loader.core.utils import clone_with_timeout + NULLID = mercurial.node.nullid HgNodeId = NewType("HgNodeId", bytes) Repository = hg.localrepo @@ -116,64 +112,13 @@ ) -class CloneTimeout(Exception): - pass - - -class CloneFailure(Exception): - pass - - -def _clone_task(src: str, dest: str, errors: Queue) -> None: - """Clone task to run in a subprocess. - - Args: - src: clone source - dest: clone destination - errors: message queue to communicate errors - """ - try: - hg.clone( - ui=mercurial.ui.ui.load(), - peeropts={}, - source=src.encode(), - dest=dest.encode(), - update=False, - ) - except Exception as e: - exc_buffer = io.StringIO() - traceback.print_exc(file=exc_buffer) - errors.put_nowait(exc_buffer.getvalue()) - raise e - - -def clone(src: str, dest: str, timeout: float) -> None: - """Clone a repository with timeout. - - Args: - src: clone source - dest: clone destination - timeout: timeout in seconds - """ - errors: Queue = Queue() - process = Process(target=_clone_task, args=(src, dest, errors)) - process.start() - process.join(timeout) - - if process.is_alive(): - process.terminate() - # Give it literally a second (in successive steps of 0.1 second), then kill it. - # Can't use `process.join(1)` here, billiard appears to be bugged - # https://github.com/celery/billiard/issues/270 - killed = False - for _ in range(10): - time.sleep(0.1) - if not process.is_alive(): - break - else: - killed = True - os.kill(process.pid, signal.SIGKILL) - raise CloneTimeout(src, timeout, killed) - - if not errors.empty(): - raise CloneFailure(src, dest, errors.get()) +def clone(src: str, dest: str, timeout: float): + closure = partial( + hg.clone, + ui=mercurial.ui.ui.load(), + peeropts={}, + source=src.encode(), + dest=dest.encode(), + update=False, + ) + clone_with_timeout(src, dest, closure, timeout)