diff --git a/swh/loader/mercurial/loader.py b/swh/loader/mercurial/loader.py index 2cf8327..11c3bb6 100644 --- a/swh/loader/mercurial/loader.py +++ b/swh/loader/mercurial/loader.py @@ -1,549 +1,598 @@ # Copyright (C) 2017-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """This document contains a SWH loader for ingesting repository data from Mercurial version 2 bundle files. """ # NOTE: The code here does expensive work twice in places because of the # intermediate need to check for what is missing before sending to the database # and the desire to not juggle very large amounts of data. # TODO: Decide whether to also serialize to disk and read back more quickly # from there. Maybe only for very large repos and fast drives. # - Avi import datetime import hglib +import multiprocessing import os +from queue import Empty import random import re +import time from dateutil import parser from shutil import rmtree from tempfile import mkdtemp from swh.model import identifiers from swh.model.hashutil import ( MultiHash, hash_to_hex, hash_to_bytes, DEFAULT_ALGORITHMS ) from swh.loader.core.loader import UnbufferedLoader from swh.loader.core.converters import content_for_storage from swh.loader.core.utils import clean_dangling_folders from . import converters from .archive_extract import tmp_extract from .bundle20_reader import Bundle20Reader from .converters import PRIMARY_ALGO as ALGO from .objects import SelectiveCache, SimpleTree TAG_PATTERN = re.compile('[0-9A-Fa-f]{40}') TEMPORARY_DIR_PREFIX_PATTERN = 'swh.loader.mercurial.' HEAD_POINTER_NAME = b'tip' +class CloneTimeoutError(Exception): + pass + + class HgBundle20Loader(UnbufferedLoader): """Mercurial loader able to deal with remote or local repository. """ CONFIG_BASE_FILENAME = 'loader/mercurial' ADDITIONAL_CONFIG = { 'bundle_filename': ('str', 'HG20_none_bundle'), 'reduce_effort': ('bool', False), 'temp_directory': ('str', '/tmp'), 'cache1_size': ('int', 800*1024*1024), 'cache2_size': ('int', 800*1024*1024), + 'clone_timeout_seconds': ('int', 7200), } visit_type = 'hg' def __init__(self, logging_class='swh.loader.mercurial.Bundle20Loader'): super().__init__(logging_class=logging_class) self.content_max_size_limit = self.config['content_size_limit'] self.bundle_filename = self.config['bundle_filename'] self.reduce_effort_flag = self.config['reduce_effort'] self.empty_repository = None self.temp_directory = self.config['temp_directory'] self.cache1_size = self.config['cache1_size'] self.cache2_size = self.config['cache2_size'] + self.clone_timeout = self.config['clone_timeout_seconds'] self.working_directory = None self.bundle_path = None def pre_cleanup(self): """Cleanup potential dangling files from prior runs (e.g. OOM killed tasks) """ clean_dangling_folders(self.temp_directory, pattern_check=TEMPORARY_DIR_PREFIX_PATTERN, log=self.log) def cleanup(self): """Clean temporary working directory """ if self.bundle_path and os.path.exists(self.bundle_path): self.log.debug('Cleanup up working bundle %s' % self.bundle_path) os.unlink(self.bundle_path) if self.working_directory and os.path.exists(self.working_directory): self.log.debug('Cleanup up working directory %s' % ( self.working_directory, )) rmtree(self.working_directory) def get_heads(self, repo): """Read the closed branches heads (branch, bookmarks) and returns a dict with key the branch_name (bytes) and values the tuple (pointer nature (bytes), mercurial's node id (bytes)). Those needs conversion to swh-ids. This is taken care of in get_revisions. """ b = {} for _, node_hash_id, pointer_nature, branch_name, *_ in repo.heads(): b[branch_name] = ( pointer_nature, hash_to_bytes(node_hash_id.decode())) bookmarks = repo.bookmarks() if bookmarks and bookmarks[0]: for bookmark_name, _, target_short in bookmarks[0]: target = repo[target_short].node() b[bookmark_name] = (None, hash_to_bytes(target.decode())) return b def prepare_origin_visit(self, *, origin_url, visit_date, **kwargs): self.origin_url = origin_url self.origin = {'url': self.origin_url, 'type': self.visit_type} if isinstance(visit_date, str): # visit_date can be string or datetime visit_date = parser.parse(visit_date) self.visit_date = visit_date self.last_visit = self.storage.origin_visit_get_latest( self.origin['url'], require_snapshot=True) + @staticmethod + def clone_with_timeout(log, origin, destination, timeout): + queue = multiprocessing.Queue() + start = time.monotonic() + + def do_clone(queue, origin, destination): + try: + result = hglib.clone(source=origin, dest=destination) + except BaseException as e: + queue.put(e) + else: + queue.put(result) + + process = multiprocessing.Process(target=do_clone, + args=(queue, origin, destination)) + process.start() + + while True: + try: + result = queue.get(timeout=0.1) + break + except Empty: + duration = time.monotonic() - start + if timeout and duration > timeout: + log.warning('Timeout cloning `%s` within %s seconds', + origin, timeout) + process.terminate() + process.join() + raise CloneTimeoutError(origin, timeout) + continue + + process.join() + if isinstance(result, Exception): + raise result from None + + return result + def prepare(self, *, origin_url, visit_date, directory=None): """Prepare the necessary steps to load an actual remote or local repository. To load a local repository, pass the optional directory parameter as filled with a path to a real local folder. To load a remote repository, pass the optional directory parameter as None. Args: origin_url (str): Origin url to load visit_date (str/datetime): Date of the visit directory (str/None): The local directory to load """ self.branches = {} self.tags = [] self.releases = {} self.node_2_rev = {} if not directory: # remote repository self.working_directory = mkdtemp( prefix=TEMPORARY_DIR_PREFIX_PATTERN, suffix='-%s' % os.getpid(), dir=self.temp_directory) os.makedirs(self.working_directory, exist_ok=True) self.hgdir = self.working_directory - self.log.debug('Cloning %s to %s' % ( - self.origin['url'], self.hgdir)) - hglib.clone(source=self.origin['url'], dest=self.hgdir) + self.log.debug('Cloning %s to %s with timeout %s seconds', + self.origin['url'], self.hgdir, self.clone_timeout) + + self.clone_with_timeout(self.log, self.origin['url'], self.hgdir, + self.clone_timeout) + else: # local repository self.working_directory = None self.hgdir = directory self.bundle_path = os.path.join(self.hgdir, self.bundle_filename) self.log.debug('Bundling at %s' % self.bundle_path) with hglib.open(self.hgdir) as repo: self.heads = self.get_heads(repo) repo.bundle(bytes(self.bundle_path, 'utf-8'), all=True, type=b'none-v2') self.cache_filename1 = os.path.join( self.hgdir, 'swh-cache-1-%s' % ( hex(random.randint(0, 0xffffff))[2:], )) self.cache_filename2 = os.path.join( self.hgdir, 'swh-cache-2-%s' % ( hex(random.randint(0, 0xffffff))[2:], )) try: self.br = Bundle20Reader(bundlefile=self.bundle_path, cache_filename=self.cache_filename1, cache_size=self.cache1_size) except FileNotFoundError: # Empty repository! Still a successful visit targeting an # empty snapshot self.log.warn('%s is an empty repository!' % self.hgdir) self.empty_repository = True else: self.reduce_effort = set() if self.reduce_effort_flag: now = datetime.datetime.now(tz=datetime.timezone.utc) if (now - self.visit_date).days > 1: # Assuming that self.visit_date would be today for # a new visit, treat older visit dates as # indication of wanting to skip some processing # effort. for header, commit in self.br.yield_all_changesets(): ts = commit['time'].timestamp() if ts < self.visit_date.timestamp(): self.reduce_effort.add(header['node']) def has_contents(self): return not self.empty_repository def has_directories(self): return not self.empty_repository def has_revisions(self): return not self.empty_repository def has_releases(self): return not self.empty_repository def fetch_data(self): """Fetch the data from the data source.""" pass def get_contents(self): """Get the contents that need to be loaded.""" # NOTE: This method generates blobs twice to reduce memory usage # without generating disk writes. self.file_node_to_hash = {} hash_to_info = {} self.num_contents = 0 contents = {} missing_contents = set() for blob, node_info in self.br.yield_all_blobs(): self.num_contents += 1 file_name = node_info[0] header = node_info[2] length = len(blob) if header['linknode'] in self.reduce_effort: algorithms = [ALGO] else: algorithms = DEFAULT_ALGORITHMS h = MultiHash.from_data(blob, hash_names=algorithms) content = h.digest() content['length'] = length blob_hash = content[ALGO] self.file_node_to_hash[header['node']] = blob_hash if header['linknode'] in self.reduce_effort: continue hash_to_info[blob_hash] = node_info contents[blob_hash] = content missing_contents.add(blob_hash) if file_name == b'.hgtags': # https://www.mercurial-scm.org/wiki/GitConcepts#Tag_model # overwrite until the last one self.tags = (t for t in blob.split(b'\n') if t != b'') if contents: missing_contents = set( self.storage.content_missing( list(contents.values()), key_hash=ALGO ) ) # Clusters needed blobs by file offset and then only fetches the # groups at the needed offsets. focs = {} # "file/offset/contents" for blob_hash in missing_contents: _, file_offset, header = hash_to_info[blob_hash] focs.setdefault(file_offset, {}) focs[file_offset][header['node']] = blob_hash hash_to_info = None for offset, node_hashes in sorted(focs.items()): for header, data, *_ in self.br.yield_group_objects( group_offset=offset ): node = header['node'] if node in node_hashes: blob, meta = self.br.extract_meta_from_blob(data) content = contents.pop(node_hashes[node], None) if content: content['data'] = blob yield content_for_storage( content, log=self.log, max_content_size=self.content_max_size_limit, origin_url=self.origin['url'] ) def load_directories(self): """This is where the work is done to convert manifest deltas from the repository bundle into SWH directories. """ self.mnode_to_tree_id = {} cache_hints = self.br.build_manifest_hints() def tree_size(t): return t.size() self.trees = SelectiveCache(cache_hints=cache_hints, size_function=tree_size, filename=self.cache_filename2, max_size=self.cache2_size) tree = SimpleTree() for header, added, removed in self.br.yield_all_manifest_deltas( cache_hints ): node = header['node'] basenode = header['basenode'] tree = self.trees.fetch(basenode) or tree # working tree for path in removed.keys(): tree = tree.remove_tree_node_for_path(path) for path, info in added.items(): file_node, is_symlink, perms_code = info tree = tree.add_blob( path, self.file_node_to_hash[file_node], is_symlink, perms_code ) if header['linknode'] in self.reduce_effort: self.trees.store(node, tree) else: new_dirs = [] self.mnode_to_tree_id[node] = tree.hash_changed(new_dirs) self.trees.store(node, tree) yield header, tree, new_dirs def get_directories(self): """Compute directories to load """ dirs = {} self.num_directories = 0 for _, _, new_dirs in self.load_directories(): for d in new_dirs: self.num_directories += 1 dirs[d['id']] = d missing_dirs = list(dirs.keys()) if missing_dirs: missing_dirs = self.storage.directory_missing(missing_dirs) for _id in missing_dirs: yield dirs[_id] dirs = {} def get_revisions(self): """Compute revisions to load """ revisions = {} self.num_revisions = 0 for header, commit in self.br.yield_all_changesets(): if header['node'] in self.reduce_effort: continue self.num_revisions += 1 date_dict = identifiers.normalize_timestamp( int(commit['time'].timestamp()) ) author_dict = converters.parse_author(commit['user']) if commit['manifest'] == Bundle20Reader.NAUGHT_NODE: directory_id = SimpleTree().hash_changed() else: directory_id = self.mnode_to_tree_id[commit['manifest']] extra_meta = [] extra = commit.get('extra') if extra: for e in extra.split(b'\x00'): k, v = e.split(b':', 1) k = k.decode('utf-8') # transplant_source stores binary reference to a changeset # prefer to dump hexadecimal one in the revision metadata if k == 'transplant_source': v = hash_to_hex(v) extra_meta.append([k, v]) revision = { 'author': author_dict, 'date': date_dict, 'committer': author_dict, 'committer_date': date_dict, 'type': 'hg', 'directory': directory_id, 'message': commit['message'], 'metadata': { 'node': hash_to_hex(header['node']), 'extra_headers': [ ['time_offset_seconds', str(commit['time_offset_seconds']).encode('utf-8')], ] + extra_meta }, 'synthetic': False, 'parents': [] } p1 = self.node_2_rev.get(header['p1']) p2 = self.node_2_rev.get(header['p2']) if p1: revision['parents'].append(p1) if p2: revision['parents'].append(p2) revision['id'] = hash_to_bytes( identifiers.revision_identifier(revision) ) self.node_2_rev[header['node']] = revision['id'] revisions[revision['id']] = revision # Converts heads to use swh ids self.heads = { branch_name: (pointer_nature, self.node_2_rev[node_id]) for branch_name, (pointer_nature, node_id) in self.heads.items() } missing_revs = revisions.keys() if missing_revs: missing_revs = set( self.storage.revision_missing(list(missing_revs)) ) for r in missing_revs: yield revisions[r] self.mnode_to_tree_id = None def _read_tag(self, tag, split_byte=b' '): node, *name = tag.split(split_byte) name = split_byte.join(name) return node, name def get_releases(self): """Get the releases that need to be loaded.""" self.num_releases = 0 releases = {} missing_releases = [] for t in self.tags: self.num_releases += 1 node, name = self._read_tag(t) node = node.decode() node_bytes = hash_to_bytes(node) if not TAG_PATTERN.match(node): self.log.warn('Wrong pattern (%s) found in tags. Skipping' % ( node, )) continue if node_bytes not in self.node_2_rev: self.log.warn('No matching revision for tag %s ' '(hg changeset: %s). Skipping' % (name.decode(), node)) continue tgt_rev = self.node_2_rev[node_bytes] release = { 'name': name, 'target': tgt_rev, 'target_type': 'revision', 'message': None, 'metadata': None, 'synthetic': False, 'author': {'name': None, 'email': None, 'fullname': b''}, 'date': None } id_hash = hash_to_bytes( identifiers.release_identifier(release)) release['id'] = id_hash missing_releases.append(id_hash) releases[id_hash] = release self.releases[name] = id_hash if missing_releases: missing_releases = set( self.storage.release_missing(missing_releases)) for _id in missing_releases: yield releases[_id] def get_snapshot(self): """Get the snapshot that need to be loaded.""" branches = {} for name, (pointer_nature, target) in self.heads.items(): branches[name] = {'target': target, 'target_type': 'revision'} if pointer_nature == HEAD_POINTER_NAME: branches[b'HEAD'] = {'target': name, 'target_type': 'alias'} for name, target in self.releases.items(): branches[name] = {'target': target, 'target_type': 'release'} snap = { 'id': None, 'branches': branches, } snap['id'] = identifiers.identifier_to_bytes( identifiers.snapshot_identifier(snap)) return snap def get_fetch_history_result(self): """Return the data to store in fetch_history.""" return { 'contents': self.num_contents, 'directories': self.num_directories, 'revisions': self.num_revisions, 'releases': self.num_releases, } def load_status(self): snapshot = self.get_snapshot() load_status = 'eventful' if (self.last_visit is not None and self.last_visit['snapshot'] == snapshot['id']): load_status = 'uneventful' return { 'status': load_status, } class HgArchiveBundle20Loader(HgBundle20Loader): """Mercurial loader for repository wrapped within archives. """ def __init__(self): super().__init__( logging_class='swh.loader.mercurial.HgArchiveBundle20Loader') self.temp_dir = None def prepare(self, *, origin_url, archive_path, visit_date): self.temp_dir = tmp_extract(archive=archive_path, dir=self.temp_directory, prefix=TEMPORARY_DIR_PREFIX_PATTERN, suffix='.dump-%s' % os.getpid(), log=self.log, source=origin_url) repo_name = os.listdir(self.temp_dir)[0] directory = os.path.join(self.temp_dir, repo_name) super().prepare(origin_url=origin_url, visit_date=visit_date, directory=directory) def cleanup(self): if self.temp_dir and os.path.exists(self.temp_dir): rmtree(self.temp_dir) super().cleanup() diff --git a/swh/loader/mercurial/tests/common.py b/swh/loader/mercurial/tests/common.py index f24cac7..2e9f1ea 100644 --- a/swh/loader/mercurial/tests/common.py +++ b/swh/loader/mercurial/tests/common.py @@ -1,63 +1,64 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from swh.loader.mercurial.loader import ( HgBundle20Loader, HgArchiveBundle20Loader ) _LOADER_TEST_CONFIG = { 'bundle_filename': 'HG20_none_bundle', 'cache1_size': 838860800, 'cache2_size': 838860800, 'content_packet_size': 100000, 'content_packet_size_bytes': 1073741824, 'content_size_limit': 104857600, 'directory_packet_size': 25000, + 'clone_timeout_seconds': 2 * 3600, 'log_db': 'dbname=softwareheritage-log', 'occurrence_packet_size': 100000, 'reduce_effort': False, 'release_packet_size': 100000, 'revision_packet_size': 100000, 'save_data': False, 'save_data_path': '', 'send_contents': True, 'send_directories': True, 'send_occurrences': True, 'send_releases': True, 'send_revisions': True, 'send_snapshot': True, 'storage': {'args': {}, 'cls': 'memory'}, 'temp_directory': '/tmp/swh.loader.mercurial' } class BaseHgLoaderMemoryStorage: """The base mercurial loader to test. Mixin behavior changed to: - use an in-memory storage - not use the default configuration loading mechanism At the end of the tests, you can make sure you have the rights objects. """ def __init__(self): super().__init__() self.origin_id = 1 self.visit = 1 def parse_config_file(self, *args, **kwargs): return _LOADER_TEST_CONFIG class HgLoaderMemoryStorage(BaseHgLoaderMemoryStorage, HgBundle20Loader): pass class HgArchiveLoaderMemoryStorage(BaseHgLoaderMemoryStorage, HgArchiveBundle20Loader): pass diff --git a/swh/loader/mercurial/tests/test_loader.py b/swh/loader/mercurial/tests/test_loader.py index 74fc0de..e093961 100644 --- a/swh/loader/mercurial/tests/test_loader.py +++ b/swh/loader/mercurial/tests/test_loader.py @@ -1,313 +1,368 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import logging import os +import time from unittest.mock import patch +import hglib +import pytest + from swh.loader.core.tests import BaseLoaderTest from swh.storage.algos.snapshot import snapshot_get_all_branches from .common import HgLoaderMemoryStorage, HgArchiveLoaderMemoryStorage +from ..loader import HgBundle20Loader, CloneTimeoutError class BaseHgLoaderTest(BaseLoaderTest): """Mixin base loader test to prepare the mercurial repository to uncompress, load and test the results. This sets up """ def setUp(self, loader=HgLoaderMemoryStorage, archive_name='the-sandbox.tgz', filename='the-sandbox', uncompress_archive=True): super().setUp(archive_name=archive_name, filename=filename, prefix_tmp_folder_name='swh.loader.mercurial.', start_path=os.path.dirname(__file__), uncompress_archive=uncompress_archive) self.loader = loader() self.storage = self.loader.storage class WithoutReleaseLoaderTest(BaseHgLoaderTest): """Load a mercurial repository without release """ def test_load(self): """Load a repository with multiple branches results in 1 snapshot """ # when self.loader.load( origin_url=self.repo_url, visit_date='2016-05-03 15:16:32+00', directory=self.destination_path) # then self.assertCountContents(2) self.assertCountDirectories(3) self.assertCountReleases(0) self.assertCountRevisions(58) tip_revision_develop = 'a9c4534552df370f43f0ef97146f393ef2f2a08c' tip_revision_default = '70e750bb046101fdced06f428e73fee471509c56' # same from rev 3 onward directory_hash = '180bd57623a7c2c47a8c43514a5f4d903503d0aa' # cf. test_loader.org for explaining from where those hashes # come from expected_revisions = { # revision hash | directory hash # noqa 'aafb69fd7496ca617f741d38c40808ff2382aabe': 'e2e117569b086ceabeeedee4acd95f35298d4553', # noqa 'b6932cb7f59e746899e4804f3d496126d1343615': '9cd8160c67ac4b0bc97e2e2cd918a580425167d3', # noqa tip_revision_default: directory_hash, '18012a93d5aadc331c468dac84b524430f4abc19': directory_hash, 'bec4c0a31b0b2502f44f34aeb9827cd090cca621': directory_hash, '5f4eba626c3f826820c4475d2d81410759ec911b': directory_hash, 'dcba06661c607fe55ec67b1712d153b69f65e38c': directory_hash, 'c77e776d22548d47a8d96463a3556172776cd59b': directory_hash, '61d762d65afb3150e2653d6735068241779c1fcf': directory_hash, '40def747398c76ceec1bd248e3a6cb2a52e22dc5': directory_hash, '6910964416438ca8d1698f6295871d727c4d4851': directory_hash, 'be44d5e6cc66580f59c108f8bff5911ee91a22e4': directory_hash, 'c4a95d5097519dedac437fddf0ef775136081241': directory_hash, '32eb0354a660128e205bf7c3a84b46040ef70d92': directory_hash, 'dafa445964230e808148db043c126063ea1dc9b6': directory_hash, 'a41e2a548ba51ee47f22baad8e88994853d3e2f5': directory_hash, 'dc3e3ab7fe257d04769528e5e17ad9f1acb44659': directory_hash, 'd2164061453ecb03d4347a05a77db83f706b8e15': directory_hash, '34192ceef239b8b72141efcc58b1d7f1676a18c9': directory_hash, '2652147529269778757d96e09aaf081695548218': directory_hash, '4d640e8064fe69b4c851dfd43915c431e80c7497': directory_hash, 'c313df50bfcaa773dcbe038d00f8bd770ba997f8': directory_hash, '769db00b34b9e085dc699c8f1550c95793d0e904': directory_hash, '2973e5dc9568ac491b198f6b7f10c44ddc04e0a3': directory_hash, 'be34b8c7857a6c04e41cc06b26338d8e59cb2601': directory_hash, '24f45e41637240b7f9e16d2791b5eacb4a406d0f': directory_hash, '62ff4741eac1821190f6c2cdab7c8a9d7db64ad0': directory_hash, 'c346f6ff7f42f2a8ff867f92ab83a6721057d86c': directory_hash, 'f2afbb94b319ef5d60823859875284afb95dcc18': directory_hash, '4e2dc6d6073f0b6d348f84ded52f9143b10344b9': directory_hash, '31cd7c5f669868651c57e3a2ba25ac45f76fa5cf': directory_hash, '25f5b27dfa5ed15d336188ef46bef743d88327d4': directory_hash, '88b80615ed8561be74a700b92883ec0374ddacb0': directory_hash, '5ee9ea92ed8cc1737b7670e39dab6081c64f2598': directory_hash, 'dcddcc32740d2de0e1403e21a5c4ed837b352992': directory_hash, '74335db9f45a5d1c8133ff7a7db5ed7a8d4a197b': directory_hash, 'cb36b894129ca7910bb81c457c72d69d5ff111bc': directory_hash, 'caef0cb155eb6c55215aa59aabe04a9c702bbe6a': directory_hash, '5017ce0b285351da09a2029ea2cf544f79b593c7': directory_hash, '17a62618eb6e91a1d5d8e1246ccedae020d3b222': directory_hash, 'a1f000fb8216838aa2a120738cc6c7fef2d1b4d8': directory_hash, '9f82d95bd3edfb7f18b1a21d6171170395ea44ce': directory_hash, 'a701d39a17a9f48c61a06eee08bd9ac0b8e3838b': directory_hash, '4ef794980f820d44be94b2f0d53eb34d4241638c': directory_hash, 'ddecbc16f4c916c39eacfcb2302e15a9e70a231e': directory_hash, '3565e7d385af0745ec208d719e469c2f58be8e94': directory_hash, 'c875bad563a73a25c5f3379828b161b1441a7c5d': directory_hash, '94be9abcf9558213ff301af0ecd8223451ce991d': directory_hash, '1ee770fd10ea2d8c4f6e68a1dbe79378a86611e0': directory_hash, '553b09724bd30d9691b290e157b27a73e2d3e537': directory_hash, '9e912851eb64e3a1e08fbb587de7a4c897ce5a0a': directory_hash, '9c9e0ff08f215a5a5845ce3dbfc5b48c8050bdaf': directory_hash, 'db9e625ba90056304897a94c92e5d27bc60f112d': directory_hash, '2d4a801c9a9645fcd3a9f4c06418d8393206b1f3': directory_hash, 'e874cd5967efb1f45282e9f5ce87cc68a898a6d0': directory_hash, 'e326a7bbb5bc00f1d8cacd6108869dedef15569c': directory_hash, '3ed4b85d30401fe32ae3b1d650f215a588293a9e': directory_hash, tip_revision_develop: directory_hash, } self.assertRevisionsContain(expected_revisions) self.assertCountSnapshots(1) expected_snapshot = { 'id': '3b8fe58e467deb7597b12a5fd3b2c096b8c02028', 'branches': { 'develop': { 'target': tip_revision_develop, 'target_type': 'revision' }, 'default': { 'target': tip_revision_default, 'target_type': 'revision' }, 'HEAD': { 'target': 'develop', 'target_type': 'alias', } } } self.assertSnapshotEqual(expected_snapshot) self.assertEqual(self.loader.load_status(), {'status': 'eventful'}) self.assertEqual(self.loader.visit_status(), 'full') def test_load_status(self): # first visit of the mercurial repository self.loader.load( origin_url=self.repo_url, visit_date='2016-05-03 15:16:32+00', directory=self.destination_path) self.assertEqual(self.loader.load_status(), {'status': 'eventful'}) self.assertEqual(self.loader.visit_status(), 'full') # second visit with no changes in the mercurial repository # since the first one self.loader.load( origin_url=self.repo_url, visit_date='2016-05-04 14:12:21+00', directory=self.destination_path) self.assertEqual(self.loader.load_status(), {'status': 'uneventful'}) self.assertEqual(self.loader.visit_status(), 'full') class CommonHgLoaderData: def assert_data_ok(self): # then self.assertCountContents(3) self.assertCountDirectories(3) self.assertCountReleases(1) self.assertCountRevisions(3) tip_release = '515c4d72e089404356d0f4b39d60f948b8999140' self.assertReleasesContain([tip_release]) tip_revision_default = 'c3dbe4fbeaaa98dd961834e4007edb3efb0e2a27' # cf. test_loader.org for explaining from where those hashes # come from expected_revisions = { # revision hash | directory hash # noqa '93b48d515580522a05f389bec93227fc8e43d940': '43d727f2f3f2f7cb3b098ddad1d7038464a4cee2', # noqa '8dd3db5d5519e4947f035d141581d304565372d2': 'b3f85f210ff86d334575f64cb01c5bf49895b63e', # noqa tip_revision_default: '8f2be433c945384c85920a8e60f2a68d2c0f20fb', } self.assertRevisionsContain(expected_revisions) self.assertCountSnapshots(1) expected_snapshot = { 'id': 'd35668e02e2ba4321dc951cd308cf883786f918a', 'branches': { 'default': { 'target': tip_revision_default, 'target_type': 'revision' }, '0.1': { 'target': tip_release, 'target_type': 'release' }, 'HEAD': { 'target': 'default', 'target_type': 'alias', } } } self.assertSnapshotEqual(expected_snapshot) self.assertEqual(self.loader.load_status(), {'status': 'eventful'}) self.assertEqual(self.loader.visit_status(), 'full') class WithReleaseLoaderTest(BaseHgLoaderTest, CommonHgLoaderData): """Load a mercurial repository with release """ def setUp(self): super().setUp(archive_name='hello.tgz', filename='hello') def test_load(self): """Load a repository with tags results in 1 snapshot """ # when self.loader.load( origin_url=self.repo_url, visit_date='2016-05-03 15:16:32+00', directory=self.destination_path) self.assert_data_ok() class ArchiveLoaderTest(BaseHgLoaderTest, CommonHgLoaderData): """Load a mercurial repository archive with release """ def setUp(self): super().setUp(loader=HgArchiveLoaderMemoryStorage, archive_name='hello.tgz', filename='hello', uncompress_archive=False) def test_load(self): """Load a mercurial repository archive with tags results in 1 snapshot """ # when self.loader.load( origin_url=self.repo_url, visit_date='2016-05-03 15:16:32+00', archive_path=self.destination_path) self.assert_data_ok() @patch('swh.loader.mercurial.archive_extract.patoolib') def test_load_with_failure(self, mock_patoo): mock_patoo.side_effect = ValueError # when r = self.loader.load( origin_url=self.repo_url, visit_date='2016-05-03 15:16:32+00', archive_path=self.destination_path) self.assertEqual(r, {'status': 'failed'}) self.assertCountContents(0) self.assertCountDirectories(0) self.assertCountRevisions(0) self.assertCountReleases(0) self.assertCountSnapshots(0) class WithTransplantLoaderTest(BaseHgLoaderTest): """Load a mercurial repository where transplant operations have been used. """ def setUp(self): super().setUp(archive_name='transplant.tgz', filename='transplant') def test_load(self): # load hg repository self.loader.load( origin_url=self.repo_url, visit_date='2019-05-23 12:06:00+00', directory=self.destination_path) # collect swh revisions origin_url = self.storage.origin_get([ {'type': 'hg', 'url': self.repo_url}])[0]['url'] visit = self.storage.origin_visit_get_latest( origin_url, require_snapshot=True) revisions = [] snapshot = snapshot_get_all_branches(self.storage, visit['snapshot']) for branch in snapshot['branches'].values(): if branch['target_type'] != 'revision': continue revisions.append(branch['target']) # extract original changesets info and the transplant sources hg_changesets = set() transplant_sources = set() for rev in self.storage.revision_log(revisions): hg_changesets.add(rev['metadata']['node']) for k, v in rev['metadata']['extra_headers']: if k == 'transplant_source': transplant_sources.add(v) # check extracted data are valid self.assertTrue(len(hg_changesets) > 0) self.assertTrue(len(transplant_sources) > 0) self.assertTrue(transplant_sources.issubset(hg_changesets)) + + +def test_clone_with_timeout_timeout(caplog, tmp_path, monkeypatch): + log = logging.getLogger('test_clone_with_timeout') + + def clone_timeout(source, dest): + time.sleep(60) + + monkeypatch.setattr(hglib, "clone", clone_timeout) + + with pytest.raises(CloneTimeoutError): + HgBundle20Loader.clone_with_timeout( + log, 'https://www.mercurial-scm.org/repo/hello', tmp_path, 1 + ) + + for record in caplog.records: + assert record.levelname == 'WARNING' + assert ( + 'https://www.mercurial-scm.org/repo/hello' in record.getMessage() + ) + assert record.args == ('https://www.mercurial-scm.org/repo/hello', 1) + + +def test_clone_with_timeout_returns(caplog, tmp_path, monkeypatch): + log = logging.getLogger('test_clone_with_timeout') + + def clone_return(source, dest): + return (source, dest) + + monkeypatch.setattr(hglib, "clone", clone_return) + + assert HgBundle20Loader.clone_with_timeout( + log, 'https://www.mercurial-scm.org/repo/hello', tmp_path, 1 + ) == ('https://www.mercurial-scm.org/repo/hello', tmp_path) + + +def test_clone_with_timeout_exception(caplog, tmp_path, monkeypatch): + log = logging.getLogger('test_clone_with_timeout') + + def clone_return(source, dest): + raise ValueError('Test exception') + + monkeypatch.setattr(hglib, "clone", clone_return) + + with pytest.raises(ValueError) as excinfo: + HgBundle20Loader.clone_with_timeout( + log, 'https://www.mercurial-scm.org/repo/hello', tmp_path, 1 + ) + assert 'Test exception' in excinfo.value.args[0]