diff --git a/swh/loader/svn/loader.py b/swh/loader/svn/loader.py --- a/swh/loader/svn/loader.py +++ b/swh/loader/svn/loader.py @@ -8,10 +8,14 @@ """ +import mmap import os +import re import shutil import tempfile +from subprocess import run, PIPE + from swh.core import utils from swh.model import hashutil from swh.model.from_disk import Directory @@ -21,7 +25,9 @@ from swh.loader.core.utils import clean_dangling_folders from . import svn, converters -from .utils import init_svn_repo_from_archive_dump +from .utils import ( + init_svn_repo_from_dump, init_svn_repo_from_archive_dump +) from .exception import SvnLoaderEventful, SvnLoaderUneventful from .exception import SvnLoaderHistoryAltered @@ -78,6 +84,7 @@ self.debug = self.config['debug'] self.last_seen_revision = None self.temp_directory = self.config['temp_directory'] + self.svnrepo = None def pre_cleanup(self): """Cleanup potential dangling files from prior runs (e.g. OOM killed @@ -92,6 +99,8 @@ """Clean up the svn repository's working representation on disk. """ + if not self.svnrepo: + return if self.debug: self.log.error('''NOT FOR PRODUCTION - debug flag activated Local repository not cleaned up for investigation: %s''' % ( @@ -541,3 +550,121 @@ self.temp_dir, os.path.basename(self.repo_path)) self.log.debug(msg) shutil.rmtree(self.temp_dir) + + +class SWHSvnLoaderFromRemoteDump(SWHSvnLoaderFromDumpArchive): + """ + Create a subversion repository dump using the rsvndump utility, + mount it locally and load the repository from it. + """ + def __init__(self, origin_url): + super(SWHSvnLoaderFromDumpArchive, self).__init__() + self.dump_temp_dir = None + self.temp_dir = None + self.repo_path = None + self.dump_error = None + self.partial_dump = False + + # First, check if previous revisions have been loaded for the + # subversion origin and get the number of the last one + last_loaded_svn_rev = -1 + try: + origin = \ + self.storage.origin_get({'type': 'svn', 'url': origin_url}) + last_swh_rev = \ + self.swh_latest_snapshot_revision(origin['id'])['revision'] + last_swh_rev_headers = \ + dict(last_swh_rev['metadata']['extra_headers']) + last_loaded_svn_rev = int(last_swh_rev_headers['svn_revision']) + except Exception: + pass + + # Build the rsvndump command line + rsvndump_cmd = ['rsvndump'] + if last_loaded_svn_rev >= 0: + # Previous revisions have already been loaded, so dump + # only those we are interested in while padding already loaded + # ones to keep the original revision numbers in the dump file + rsvndump_cmd += ['-r', '%s:HEAD' % last_loaded_svn_rev, + '--keep-revnums'] + # Use deltas to get a smaller dump file + rsvndump_cmd += ['--deltas', origin_url] + + # Launch the rsvndump command while capturing stderr as + # successfully dumped revision numbers are printed to it + self.dump_temp_dir = tempfile.mkdtemp() + dump_name = ''.join(c for c in origin_url if c.isalnum()) + dump_path = '%s/%s.svndump.gz' % (self.dump_temp_dir, dump_name) + self.log.debug('Executing %s' % ' '.join(rsvndump_cmd)) + with open(dump_path, 'wb') as dump_file: + rsvndump = run(rsvndump_cmd, stdout=dump_file, stderr=PIPE) + + # Get the stderr line with latest dumped revision + stderr_lines = rsvndump.stderr.split(b'\n') + last_dumped_rev = None + if len(stderr_lines) > 1: + last_dumped_rev = stderr_lines[-2] + + can_load_dump = False + + # No errors, dump file can be mounted and loaded immediately + if rsvndump.returncode == 0: + can_load_dump = True + # There was an error but it does not mean that no revisions + # can be loaded. + elif last_dumped_rev: + # Get the latest dumped revision number + last_dumped_rev = int(re.search(b'.*revision ([0-9]+)', + last_dumped_rev).group(1)) + # Check if revisions inside the dump file can be loaded anyway + if last_dumped_rev > last_loaded_svn_rev: + self.log.debug(('rsvndump did not dump all expected revisions ' + 'but revisions range %s:%s are available in ' + 'the generated dump file and will be loaded ' + 'into the archive.') % (last_loaded_svn_rev+1, + last_dumped_rev)) + # Truncate the dump file after the last successfully dumped + # revision to avoid the loading of corrupted data + f = open(dump_path, 'r+b') + s = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_WRITE) + pattern = ('Revision-number: %s' % + (last_dumped_rev+1)).encode() + n = s.find(pattern) + if n != -1: + s.resize(n) + s.close() + f.close() + can_load_dump = True + self.partial_dump = True + else: + self.dump_error = ('Last dumped subversion revision (%s) is ' + 'lesser than the last one loaded into the ' + 'archive (%s).') % (last_dumped_rev, + last_loaded_svn_rev) + else: + self.dump_error = ('An error occured when running rsvndump and ' + 'no exploitable dump file has been generated.') + + if can_load_dump: + self.temp_dir, self.repo_path = \ + init_svn_repo_from_dump(dump_path, + prefix=TEMPORARY_DIR_PREFIX_PATTERN, + suffix='-%s' % os.getpid(), + root_dir=self.temp_directory) + + def prepare(self, *args, **kwargs): + if self.dump_error: + raise Exception(self.dump_error) + else: + super().prepare(*args, **kwargs) + + def cleanup(self): + super().cleanup() + if self.dump_temp_dir and os.path.exists(self.dump_temp_dir): + shutil.rmtree(self.dump_temp_dir) + + def visit_status(self): + if self.partial_dump: + return 'partial' + else: + return super().visit_status() diff --git a/swh/loader/svn/tasks.py b/swh/loader/svn/tasks.py --- a/swh/loader/svn/tasks.py +++ b/swh/loader/svn/tasks.py @@ -3,9 +3,12 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information + from swh.scheduler.task import Task -from .loader import SWHSvnLoader, SWHSvnLoaderFromDumpArchive +from .loader import ( + SWHSvnLoader, SWHSvnLoaderFromDumpArchive, SWHSvnLoaderFromRemoteDump +) class LoadSWHSvnRepositoryTsk(Task): @@ -63,3 +66,21 @@ visit_date=visit_date, archive_path=archive_path, start_from_scratch=start_from_scratch) + + +class DumpMountAndLoadSvnRepositoryTsk(Task): + task_queue = 'swh_loader_svn_dump_mount_and_load' + + def run_task(self, *, svn_url, origin_url=None, visit_date=None, + start_from_scratch=False): + """1. Mount an svn dump from archive as a local svn repository. + 2. Load it through the svn loader. + 3. Clean up mounted svn repository archive. + + """ + loader = SWHSvnLoaderFromRemoteDump(svn_url) + loader.log = self.log + return loader.load(svn_url='file://%s' % loader.repo_path, + origin_url=origin_url, + visit_date=visit_date, + start_from_scratch=start_from_scratch) diff --git a/swh/loader/svn/utils.py b/swh/loader/svn/utils.py --- a/swh/loader/svn/utils.py +++ b/swh/loader/svn/utils.py @@ -33,9 +33,9 @@ return ts -def init_svn_repo_from_archive_dump(archive_path, prefix=None, suffix=None, - root_dir='/tmp'): - """Given a path to an archive containing an svn dump. +def init_svn_repo_from_dump(dump_path, prefix=None, suffix=None, + root_dir='/tmp', gzip=False): + """Given a path to a svn dump. Initialize an svn repository with the content of said dump. Returns: @@ -49,7 +49,7 @@ and load the dump. """ - project_name = os.path.basename(os.path.dirname(archive_path)) + project_name = os.path.basename(os.path.dirname(dump_path)) temp_dir = tempfile.mkdtemp(prefix=prefix, suffix=suffix, dir=root_dir) try: @@ -63,7 +63,11 @@ 'Failed to initialize empty svn repo for %s' % project_name) - with Popen(['gzip', '-dc', archive_path], stdout=PIPE) as dump: + read_dump_cmd = ['cat', dump_path] + if gzip: + read_dump_cmd = ['gzip', '-dc', dump_path] + + with Popen(read_dump_cmd, stdout=PIPE) as dump: cmd = ['svnadmin', 'load', '-q', repo_path] r = call(cmd, stdin=dump.stdout) if r != 0: @@ -74,3 +78,23 @@ except Exception as e: shutil.rmtree(temp_dir) raise e + + +def init_svn_repo_from_archive_dump(archive_path, prefix=None, suffix=None, + root_dir='/tmp'): + """Given a path to an archive containing an svn dump. + Initialize an svn repository with the content of said dump. + + Returns: + A tuple: + - temporary folder (str): containing the mounted repository + - repo_path (str): path to the mounted repository inside the + temporary folder + + Raises: + ValueError in case of failure to run the command to uncompress + and load the dump. + + """ + return init_svn_repo_from_dump(archive_path, prefix=prefix, suffix=suffix, + root_dir=root_dir, gzip=True)