Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F7123559
D1978.id6669.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
6 KB
Subscribers
None
D1978.id6669.diff
View Options
diff --git a/swh/loader/mercurial/loader.py b/swh/loader/mercurial/loader.py
--- a/swh/loader/mercurial/loader.py
+++ b/swh/loader/mercurial/loader.py
@@ -19,9 +19,12 @@
import datetime
import hglib
+import multiprocessing
import os
+from queue import Empty
import random
import re
+import time
from dateutil import parser
from shutil import rmtree
@@ -50,6 +53,10 @@
HEAD_POINTER_NAME = b'tip'
+class CloneTimeoutError(Exception):
+ pass
+
+
class HgBundle20Loader(UnbufferedLoader):
"""Mercurial loader able to deal with remote or local repository.
@@ -62,6 +69,7 @@
'temp_directory': ('str', '/tmp'),
'cache1_size': ('int', 800*1024*1024),
'cache2_size': ('int', 800*1024*1024),
+ 'clone_timeout_seconds': ('int', 7200),
}
visit_type = 'hg'
@@ -75,6 +83,7 @@
self.temp_directory = self.config['temp_directory']
self.cache1_size = self.config['cache1_size']
self.cache2_size = self.config['cache2_size']
+ self.clone_timeout = self.config['clone_timeout_seconds']
self.working_directory = None
self.bundle_path = None
@@ -129,6 +138,43 @@
self.last_visit = self.storage.origin_visit_get_latest(
self.origin['url'], require_snapshot=True)
+ @staticmethod
+ def clone_with_timeout(log, origin, destination, timeout):
+ queue = multiprocessing.Queue()
+ start = time.monotonic()
+
+ def do_clone(queue, origin, destination):
+ try:
+ result = hglib.clone(source=origin, dest=destination)
+ except BaseException as e:
+ queue.put(e)
+ else:
+ queue.put(result)
+
+ process = multiprocessing.Process(target=do_clone,
+ args=(queue, origin, destination))
+ process.start()
+
+ while True:
+ try:
+ result = queue.get(timeout=0.1)
+ break
+ except Empty:
+ duration = time.monotonic() - start
+ if timeout and duration > timeout:
+ log.warning('Timeout cloning `%s` within %s seconds',
+ origin, timeout)
+ process.terminate()
+ process.join()
+ raise CloneTimeoutError(origin, timeout)
+ continue
+
+ process.join()
+ if isinstance(result, Exception):
+ raise result from None
+
+ return result
+
def prepare(self, *, origin_url, visit_date, directory=None):
"""Prepare the necessary steps to load an actual remote or local
repository.
@@ -158,9 +204,12 @@
os.makedirs(self.working_directory, exist_ok=True)
self.hgdir = self.working_directory
- self.log.debug('Cloning %s to %s' % (
- self.origin['url'], self.hgdir))
- hglib.clone(source=self.origin['url'], dest=self.hgdir)
+ self.log.debug('Cloning %s to %s with timeout %s seconds',
+ self.origin['url'], self.hgdir, self.clone_timeout)
+
+ self.clone_with_timeout(self.log, self.origin['url'], self.hgdir,
+ self.clone_timeout)
+
else: # local repository
self.working_directory = None
self.hgdir = directory
diff --git a/swh/loader/mercurial/tests/common.py b/swh/loader/mercurial/tests/common.py
--- a/swh/loader/mercurial/tests/common.py
+++ b/swh/loader/mercurial/tests/common.py
@@ -16,6 +16,7 @@
'content_packet_size_bytes': 1073741824,
'content_size_limit': 104857600,
'directory_packet_size': 25000,
+ 'clone_timeout_seconds': 2 * 3600,
'log_db': 'dbname=softwareheritage-log',
'occurrence_packet_size': 100000,
'reduce_effort': False,
diff --git a/swh/loader/mercurial/tests/test_loader.py b/swh/loader/mercurial/tests/test_loader.py
--- a/swh/loader/mercurial/tests/test_loader.py
+++ b/swh/loader/mercurial/tests/test_loader.py
@@ -3,14 +3,20 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
+import logging
import os
+import time
from unittest.mock import patch
+import hglib
+import pytest
+
from swh.loader.core.tests import BaseLoaderTest
from swh.storage.algos.snapshot import snapshot_get_all_branches
from .common import HgLoaderMemoryStorage, HgArchiveLoaderMemoryStorage
+from ..loader import HgBundle20Loader, CloneTimeoutError
class BaseHgLoaderTest(BaseLoaderTest):
@@ -311,3 +317,52 @@
self.assertTrue(len(hg_changesets) > 0)
self.assertTrue(len(transplant_sources) > 0)
self.assertTrue(transplant_sources.issubset(hg_changesets))
+
+
+def test_clone_with_timeout_timeout(caplog, tmp_path, monkeypatch):
+ log = logging.getLogger('test_clone_with_timeout')
+
+ def clone_timeout(source, dest):
+ time.sleep(60)
+
+ monkeypatch.setattr(hglib, "clone", clone_timeout)
+
+ with pytest.raises(CloneTimeoutError):
+ HgBundle20Loader.clone_with_timeout(
+ log, 'https://www.mercurial-scm.org/repo/hello', tmp_path, 1
+ )
+
+ for record in caplog.records:
+ assert record.levelname == 'WARNING'
+ assert (
+ 'https://www.mercurial-scm.org/repo/hello' in record.getMessage()
+ )
+ assert record.args == ('https://www.mercurial-scm.org/repo/hello', 1)
+
+
+def test_clone_with_timeout_returns(caplog, tmp_path, monkeypatch):
+ log = logging.getLogger('test_clone_with_timeout')
+
+ def clone_return(source, dest):
+ return (source, dest)
+
+ monkeypatch.setattr(hglib, "clone", clone_return)
+
+ assert HgBundle20Loader.clone_with_timeout(
+ log, 'https://www.mercurial-scm.org/repo/hello', tmp_path, 1
+ ) == ('https://www.mercurial-scm.org/repo/hello', tmp_path)
+
+
+def test_clone_with_timeout_exception(caplog, tmp_path, monkeypatch):
+ log = logging.getLogger('test_clone_with_timeout')
+
+ def clone_return(source, dest):
+ raise ValueError('Test exception')
+
+ monkeypatch.setattr(hglib, "clone", clone_return)
+
+ with pytest.raises(ValueError) as excinfo:
+ HgBundle20Loader.clone_with_timeout(
+ log, 'https://www.mercurial-scm.org/repo/hello', tmp_path, 1
+ )
+ assert 'Test exception' in excinfo.value.args[0]
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Thu, Dec 19, 10:53 AM (19 h, 36 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3215988
Attached To
D1978: Implement timeouts for cloning repositories
Event Timeline
Log In to Comment