Changeset View
Changeset View
Standalone View
Standalone View
swh/loader/mercurial/loader.py
# Copyright (C) 2017-2018 The Software Heritage developers | # Copyright (C) 2017-2019 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
"""This document contains a SWH loader for ingesting repository data | """This document contains a SWH loader for ingesting repository data | ||||
from Mercurial version 2 bundle files. | from Mercurial version 2 bundle files. | ||||
""" | """ | ||||
▲ Show 20 Lines • Show All 60 Lines • ▼ Show 20 Lines | ADDITIONAL_CONFIG = { | ||||
'temp_directory': ('str', '/tmp'), | 'temp_directory': ('str', '/tmp'), | ||||
'cache1_size': ('int', 800*1024*1024), | 'cache1_size': ('int', 800*1024*1024), | ||||
'cache2_size': ('int', 800*1024*1024), | 'cache2_size': ('int', 800*1024*1024), | ||||
'clone_timeout_seconds': ('int', 7200), | 'clone_timeout_seconds': ('int', 7200), | ||||
} | } | ||||
visit_type = 'hg' | visit_type = 'hg' | ||||
def __init__(self, logging_class='swh.loader.mercurial.Bundle20Loader'): | def __init__(self, url, visit_date=None, directory=None, | ||||
logging_class='swh.loader.mercurial.Bundle20Loader'): | |||||
super().__init__(logging_class=logging_class) | super().__init__(logging_class=logging_class) | ||||
self.origin_url = url | |||||
self.visit_date = visit_date | |||||
self.directory = directory | |||||
self.content_max_size_limit = self.config['content_size_limit'] | self.content_max_size_limit = self.config['content_size_limit'] | ||||
self.bundle_filename = self.config['bundle_filename'] | self.bundle_filename = self.config['bundle_filename'] | ||||
self.reduce_effort_flag = self.config['reduce_effort'] | self.reduce_effort_flag = self.config['reduce_effort'] | ||||
self.empty_repository = None | self.empty_repository = None | ||||
self.temp_directory = self.config['temp_directory'] | self.temp_directory = self.config['temp_directory'] | ||||
self.cache1_size = self.config['cache1_size'] | self.cache1_size = self.config['cache1_size'] | ||||
self.cache2_size = self.config['cache2_size'] | self.cache2_size = self.config['cache2_size'] | ||||
self.clone_timeout = self.config['clone_timeout_seconds'] | self.clone_timeout = self.config['clone_timeout_seconds'] | ||||
Show All 37 Lines | def get_heads(self, repo): | ||||
bookmarks = repo.bookmarks() | bookmarks = repo.bookmarks() | ||||
if bookmarks and bookmarks[0]: | if bookmarks and bookmarks[0]: | ||||
for bookmark_name, _, target_short in bookmarks[0]: | for bookmark_name, _, target_short in bookmarks[0]: | ||||
target = repo[target_short].node() | target = repo[target_short].node() | ||||
b[bookmark_name] = (None, hash_to_bytes(target.decode())) | b[bookmark_name] = (None, hash_to_bytes(target.decode())) | ||||
return b | return b | ||||
def prepare_origin_visit(self, *, origin_url, visit_date, **kwargs): | def prepare_origin_visit(self, *args, **kwargs): | ||||
self.origin_url = origin_url | self.origin = {'url': self.origin_url} | ||||
self.origin = {'url': self.origin_url, 'type': self.visit_type} | visit_date = self.visit_date | ||||
if isinstance(visit_date, str): # visit_date can be string or datetime | if isinstance(visit_date, str): # visit_date can be string or datetime | ||||
visit_date = parser.parse(visit_date) | visit_date = parser.parse(visit_date) | ||||
self.visit_date = visit_date | self.visit_date = visit_date | ||||
self.last_visit = self.storage.origin_visit_get_latest( | self.last_visit = self.storage.origin_visit_get_latest( | ||||
self.origin['url'], require_snapshot=True) | self.origin_url, require_snapshot=True) | ||||
@staticmethod | @staticmethod | ||||
def clone_with_timeout(log, origin, destination, timeout): | def clone_with_timeout(log, origin, destination, timeout): | ||||
queue = billiard.Queue() | queue = billiard.Queue() | ||||
start = time.monotonic() | start = time.monotonic() | ||||
def do_clone(queue, origin, destination): | def do_clone(queue, origin, destination): | ||||
try: | try: | ||||
Show All 22 Lines | def clone_with_timeout(log, origin, destination, timeout): | ||||
continue | continue | ||||
process.join() | process.join() | ||||
if isinstance(result, Exception): | if isinstance(result, Exception): | ||||
raise result from None | raise result from None | ||||
return result | return result | ||||
def prepare(self, *, origin_url, visit_date, directory=None): | def prepare(self, *args, **kwargs): | ||||
"""Prepare the necessary steps to load an actual remote or local | """Prepare the necessary steps to load an actual remote or local | ||||
repository. | repository. | ||||
To load a local repository, pass the optional directory | To load a local repository, pass the optional directory | ||||
parameter as filled with a path to a real local folder. | parameter as filled with a path to a real local folder. | ||||
To load a remote repository, pass the optional directory | To load a remote repository, pass the optional directory | ||||
parameter as None. | parameter as None. | ||||
Args: | Args: | ||||
origin_url (str): Origin url to load | origin_url (str): Origin url to load | ||||
visit_date (str/datetime): Date of the visit | visit_date (str/datetime): Date of the visit | ||||
directory (str/None): The local directory to load | directory (str/None): The local directory to load | ||||
""" | """ | ||||
self.branches = {} | self.branches = {} | ||||
self.tags = [] | self.tags = [] | ||||
self.releases = {} | self.releases = {} | ||||
self.node_2_rev = {} | self.node_2_rev = {} | ||||
directory = self.directory | |||||
if not directory: # remote repository | if not directory: # remote repository | ||||
self.working_directory = mkdtemp( | self.working_directory = mkdtemp( | ||||
prefix=TEMPORARY_DIR_PREFIX_PATTERN, | prefix=TEMPORARY_DIR_PREFIX_PATTERN, | ||||
suffix='-%s' % os.getpid(), | suffix='-%s' % os.getpid(), | ||||
dir=self.temp_directory) | dir=self.temp_directory) | ||||
os.makedirs(self.working_directory, exist_ok=True) | os.makedirs(self.working_directory, exist_ok=True) | ||||
self.hgdir = self.working_directory | self.hgdir = self.working_directory | ||||
self.log.debug('Cloning %s to %s with timeout %s seconds', | self.log.debug('Cloning %s to %s with timeout %s seconds', | ||||
self.origin['url'], self.hgdir, self.clone_timeout) | self.origin_url, self.hgdir, self.clone_timeout) | ||||
self.clone_with_timeout(self.log, self.origin['url'], self.hgdir, | self.clone_with_timeout(self.log, self.origin_url, self.hgdir, | ||||
self.clone_timeout) | self.clone_timeout) | ||||
else: # local repository | else: # local repository | ||||
self.working_directory = None | self.working_directory = None | ||||
self.hgdir = directory | self.hgdir = directory | ||||
self.bundle_path = os.path.join(self.hgdir, self.bundle_filename) | self.bundle_path = os.path.join(self.hgdir, self.bundle_filename) | ||||
self.log.debug('Bundling at %s' % self.bundle_path) | self.log.debug('Bundling at %s' % self.bundle_path) | ||||
▲ Show 20 Lines • Show All 114 Lines • ▼ Show 20 Lines | def get_contents(self): | ||||
blob, meta = self.br.extract_meta_from_blob(data) | blob, meta = self.br.extract_meta_from_blob(data) | ||||
content = contents.pop(node_hashes[node], None) | content = contents.pop(node_hashes[node], None) | ||||
if content: | if content: | ||||
content['data'] = blob | content['data'] = blob | ||||
yield content_for_storage( | yield content_for_storage( | ||||
content, | content, | ||||
log=self.log, | log=self.log, | ||||
max_content_size=self.content_max_size_limit, | max_content_size=self.content_max_size_limit, | ||||
origin_url=self.origin['url'] | origin_url=self.origin_url | ||||
) | ) | ||||
def load_directories(self): | def load_directories(self): | ||||
"""This is where the work is done to convert manifest deltas from the | """This is where the work is done to convert manifest deltas from the | ||||
repository bundle into SWH directories. | repository bundle into SWH directories. | ||||
""" | """ | ||||
self.mnode_to_tree_id = {} | self.mnode_to_tree_id = {} | ||||
▲ Show 20 Lines • Show All 219 Lines • ▼ Show 20 Lines | def load_status(self): | ||||
'status': load_status, | 'status': load_status, | ||||
} | } | ||||
class HgArchiveBundle20Loader(HgBundle20Loader): | class HgArchiveBundle20Loader(HgBundle20Loader): | ||||
"""Mercurial loader for repository wrapped within archives. | """Mercurial loader for repository wrapped within archives. | ||||
""" | """ | ||||
def __init__(self): | def __init__(self, url, visit_date=None, archive_path=None): | ||||
super().__init__( | super().__init__( | ||||
url, visit_date=visit_date, | |||||
logging_class='swh.loader.mercurial.HgArchiveBundle20Loader') | logging_class='swh.loader.mercurial.HgArchiveBundle20Loader') | ||||
self.temp_dir = None | self.temp_dir = None | ||||
self.archive_path = archive_path | |||||
def prepare(self, *, origin_url, archive_path, visit_date): | def prepare(self, *args, **kwargs): | ||||
self.temp_dir = tmp_extract(archive=archive_path, | self.temp_dir = tmp_extract(archive=self.archive_path, | ||||
dir=self.temp_directory, | dir=self.temp_directory, | ||||
prefix=TEMPORARY_DIR_PREFIX_PATTERN, | prefix=TEMPORARY_DIR_PREFIX_PATTERN, | ||||
suffix='.dump-%s' % os.getpid(), | suffix='.dump-%s' % os.getpid(), | ||||
log=self.log, | log=self.log, | ||||
source=origin_url) | source=self.origin_url) | ||||
repo_name = os.listdir(self.temp_dir)[0] | repo_name = os.listdir(self.temp_dir)[0] | ||||
directory = os.path.join(self.temp_dir, repo_name) | self.directory = os.path.join(self.temp_dir, repo_name) | ||||
super().prepare(origin_url=origin_url, | super().prepare(*args, **kwargs) | ||||
visit_date=visit_date, directory=directory) | |||||
def cleanup(self): | def cleanup(self): | ||||
if self.temp_dir and os.path.exists(self.temp_dir): | if self.temp_dir and os.path.exists(self.temp_dir): | ||||
rmtree(self.temp_dir) | rmtree(self.temp_dir) | ||||
super().cleanup() | super().cleanup() |