diff --git a/swh/fetcher/googlecode/checker.py b/swh/fetcher/googlecode/checker.py index 152ae65..986185f 100644 --- a/swh/fetcher/googlecode/checker.py +++ b/swh/fetcher/googlecode/checker.py @@ -1,183 +1,215 @@ # Copyright (C) 2015-2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """Namespace to deal with checks on git, svn and hg repository from googlecode archives. System requisites: svn, git, hg, unzip, pigz """ import glob import logging import os import shutil import tempfile from subprocess import PIPE, Popen, check_call from swh.core import config from . import utils from .fetcher import SWHGoogleArchiveFetcher REPO_TYPE_FILENAME = 'project.json' REPO_TYPE_KEY = 'repoType' +DEFAULT_SMALL_LENGTH_DISPATCH = 100 * 1024 * 1024 # 100 Mib +DEFAULT_MEDIUM_LENGTH_DISPATCH = 500 * 1024 * 1024 # 500 Mib + + +class SWHGoogleArchiveDispatchChecker(config.SWHConfig): + """A google archive 'integrity' checker. + + This checker will: + - check the archive's length + - if not ok, refetch the archive + . - Depending on the archive's length, dispatch to other checker + + """ + def __init__(self): + self.log = logging.getLogger( + 'swh.fetcher.google.SWHGoogleArchiveDispatchChecker') + + def process(self, archive_path, temp_root_dir): + """Check the archive path is actually ok. + + """ + self.log.info('Check %s\'s metadata' % archive_path) + + extension = os.path.splitext(archive_path)[-1] + if extension != '.gz' and extension != '.zip': + self.log.warn('Skip %s. Only zip or gz extension files.' % + archive_path) + return + + parent_dir = os.path.dirname(archive_path) + # contains the repoType field + project_json = os.path.join(parent_dir, REPO_TYPE_FILENAME) + + meta = utils.load_meta(project_json) + if not meta: + self.log.error('Skip %s. No project.json was detected.' % + archive_path) + return + + repo_type = meta[REPO_TYPE_KEY] + + if repo_type == 'svn' and extension == '.zip': + self.log.warn('Skip %s. Only svndump for svn type repository.' % + archive_path) + return + + # check that the file's complete (some small numbers of files + # fails because of it) + json_meta = utils.load_meta(archive_path + '.json') + length = os.path.getsize(archive_path) + if length != int(json_meta['size']): # somehow incomplete + r = SWHGoogleArchiveFetcher().retrieve_source(archive_path, + json_meta, + archive_path) + if not r: + self.log.error('%s PROBLEM when fetching archive' % + archive_path) + return + + from swh.scheduler.celery_backend.config import app + from . import tasks # noqa + + if length < DEFAULT_SMALL_LENGTH_DISPATCH: + checker = app.tasks['swh.fetcher.googlecode.tasks.SWHGoogleSmallArchiveCheckerTask'] # noqa + elif length < DEFAULT_MEDIUM_LENGTH_DISPATCH: + checker = app.tasks['swh.fetcher.googlecode.tasks.SWHGoogleMediumArchiveCheckerTask'] # noqa + else: + checker = app.tasks['swh.fetcher.googlecode.tasks.SWHGoogleHugeArchiveCheckerTask'] # noqa + + checker.delay(archive_path, repo_type, temp_root_dir) def basic_check(archive_path, temp_dir, cmd): """Execute basic integrity check. Args: archive_path: the full pathname to the archive to check temp_dir: the temporary directory to load and check the repository cmd: the actual command to check the repository is ok. Returns: True in case check is ok, False otherwise. """ # all git and hg archives contain one folder with the project name cmd = ['unzip', '-q', '-o', archive_path, '-d', temp_dir] check_call(cmd) # Retrieve the archive content's first level folder (which cannot # be determined - in majority the name corresponds to the # project's name but not always...) repo_path = glob.glob(temp_dir + '/*')[0] with utils.cwd(repo_path): try: r = check_call(cmd) return r == 0 except: return False def check_svn_integrity(archive_path, temp_dir): """Check the repository's svn integrity. Args: archive_path: the full pathname to the archive to check temp_dir: the temporary directory to load and check the repository Returns: True in case check is ok, False otherwise. """ project_name = os.path.basename(os.path.dirname(archive_path)) repo_path = os.path.join(temp_dir, project_name) # create the repository that will be loaded with the dump cmd = ['svnadmin', 'create', repo_path] check_call(cmd) try: with Popen(['pigz', '-dc', archive_path], stdout=PIPE) as dump: cmd = ['svnadmin', 'load', '-q', repo_path] r = check_call(cmd, stdin=dump.stdout) return r == 0 except: return False def check_integrity(repo_type, archive_path, temp_dir): """Given a repository to uncompress in temp_dir with type repo_type, check its integrity. """ if repo_type == 'git': return basic_check(archive_path, temp_dir, cmd=['git', 'fsck']) if repo_type == 'hg': return basic_check(archive_path, temp_dir, cmd=['hg', 'verify']) if repo_type == 'svn': return check_svn_integrity(archive_path, temp_dir) raise NotImplemented("Repository type %s not implemented." % repo_type) class SWHGoogleArchiveChecker(config.SWHConfig): """A google archive 'integrity' checker. This checker will: - determine the archive's nature (hg, git, svn) by checking the project.json associated file - uncompress the archive on a temporary folder - depending on its nature, check that the archive's integrity is ok - git: `git fsck` - svn: `pigz -dc foo-repo.svndump.gz | svnadmin load repos/foo-repo` - hg: `hg verify` """ - def __init__(self): self.log = logging.getLogger( 'swh.fetcher.google.SWHGoogleArchiveChecker') - def process(self, archive_path, temp_root_dir): + def process(self, archive_path, repo_type, temp_root_dir): """Check the archive path is actually ok. """ self.log.info('Check %s\'s metadata' % archive_path) - extension = os.path.splitext(archive_path)[-1] - if extension != '.gz' and extension != '.zip': - self.log.warn('Skip %s. Only zip or gz extension files.' % - archive_path) - return - - parent_dir = os.path.dirname(archive_path) - # contains the repoType field - project_json = os.path.join(parent_dir, REPO_TYPE_FILENAME) - - meta = utils.load_meta(project_json) - if not meta: - self.log.error('Skip %s. No project.json was detected.' % - archive_path) - return - - repo_type = meta[REPO_TYPE_KEY] - - if repo_type == 'svn' and extension == '.zip': - self.log.warn('Skip %s. Only svndump for svn type repository.' % - archive_path) - return - - temp_dir = '' try: - # check that the file's complete (some small numbers of files - # fails because of it) - json_meta = utils.load_meta(archive_path + '.json') - length = os.path.getsize(archive_path) - if length != int(json_meta['size']): # somehow incomplete - r = SWHGoogleArchiveFetcher().retrieve_source(archive_path, - json_meta, - archive_path) - if not r: - self.log.error('%s PROBLEM when fetching archive' % - archive_path) - return - # compute the repo path repository temp_dir = tempfile.mkdtemp(suffix='.swh.fetcher.googlecode', prefix='tmp.', dir=temp_root_dir) self.log.debug('type: %s, archive: %s' % (repo_type, archive_path)) if check_integrity(repo_type, archive_path, temp_dir): self.log.info('%s SUCCESS' % archive_path) else: # we'll check that the current file is complete self.log.error('%s FAILURE' % archive_path) except Exception as e: self.log.error('%s PROBLEM with archive - %s' % (archive_path, e)) finally: # cleanup the temporary directory if os.path.exists(temp_dir): shutil.rmtree(temp_dir) diff --git a/swh/fetcher/googlecode/checker_producer.py b/swh/fetcher/googlecode/checker_producer.py index c8491f4..5a00136 100644 --- a/swh/fetcher/googlecode/checker_producer.py +++ b/swh/fetcher/googlecode/checker_producer.py @@ -1,33 +1,33 @@ # Copyright (C) 2015-2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import click import sys -task_name = 'swh.fetcher.googlecode.tasks.SWHGoogleArchiveCheckerTask' +task_name = 'swh.fetcher.googlecode.tasks.SWHGoogleArchiveDispatchCheckerTask' @click.command() @click.option('--archive-path', help="Archive path to check") @click.option('--temp-dir', help="Temporary folder to make check computations on archive.") def produce(archive_path, temp_dir): from swh.scheduler.celery_backend.config import app from swh.fetcher.googlecode import tasks # noqa task = app.tasks[task_name] if archive_path: # for debug purpose, one archive task.delay(archive_path, temp_dir) else: # otherwise, we deal in archive_path batch for archive_path in sys.stdin: archive_path = archive_path.rstrip() print(archive_path) task.delay(archive_path, temp_dir) if __name__ == '__main__': produce() diff --git a/swh/fetcher/googlecode/tasks.py b/swh/fetcher/googlecode/tasks.py index 826a0b5..26c63cc 100644 --- a/swh/fetcher/googlecode/tasks.py +++ b/swh/fetcher/googlecode/tasks.py @@ -1,39 +1,79 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from swh.scheduler.task import Task from .fetcher import SWHGoogleArchiveFetcher -from .checker import SWHGoogleArchiveChecker +from .checker import SWHGoogleArchiveChecker, SWHGoogleArchiveDispatchChecker class SWHGoogleArchiveFetcherTask(Task): """Main task to fetch and check archive source from google code archive server. The checks are made on: - size - md5 from the associated '.json' file associated to the archive fetched. """ task_queue = 'swh_fetcher_googlecode_fetch_archive' def run(self, archive_gs, destination_rootpath): SWHGoogleArchiveFetcher().process(archive_gs, destination_rootpath) -class SWHGoogleArchiveCheckerTask(Task): +class SWHGoogleArchiveDispatchCheckerTask(Task): """Main task to check fetched archive files from google code archive server. - The checks are more thorough, that is: + Check the length of the archives. + If archive's length is not ok, refetch it. + When done, depending on its size, dispatch: + - large: to SWHGoogleArchiveCheckerHugeTask + - small: to SWHGoogleArchiveCheckerSmallTask - uncompress the archive on a temporary folder - integrity check according to repo's nature (git, hg, svn) """ task_queue = 'swh_fetcher_googlecode_check_archive' def run(self, path, root_temp_dir): - SWHGoogleArchiveChecker().process(path, root_temp_dir) + SWHGoogleArchiveDispatchChecker().process(path, root_temp_dir) + + +class SWHGoogleArchiveCheckerTask(Task): + """Main task to check huge fetched archive files from google code + archive server. + + The checks are more thorough, that is: + - uncompress the archive on a temporary folder + - integrity check according to repo's nature (git, hg, svn) + + Intended to be inherited (cf. SWHGoogleSmallArchiveCheckerTask, + SWHGoogleMediumArchiveCheckerTask, + SWHGoogleHugeArchiveCheckerTask) + + """ + def run(self, archive_path, repo_type, root_temp_dir): + """Process a repo archive archive_path of type repo_type. + The archive is uncompressed in root_temp_dir. + + """ + SWHGoogleArchiveChecker().process( + archive_path, + repo_type, + root_temp_dir) + + +class SWHGoogleSmallArchiveCheckerTask(SWHGoogleArchiveCheckerTask): + task_queue = 'swh_fetcher_googlecode_check_small_archive' + + +class SWHGoogleMediumArchiveCheckerTask(SWHGoogleArchiveCheckerTask): + task_queue = 'swh_fetcher_googlecode_check_medium_archive' + + +class SWHGoogleHugeArchiveCheckerTask(SWHGoogleArchiveCheckerTask): + task_queue = 'swh_fetcher_googlecode_check_huge_archive'