diff --git a/Makefile.local b/Makefile.local index 9bb43ca..5c1850d 100644 --- a/Makefile.local +++ b/Makefile.local @@ -1,81 +1,78 @@ # -*- makefile -*- FLAKE = flake8 BINDIR = bin SRCDIR = swh # add -v for example FLAG= NOSE = nosetests3 TESTFLAGS = -s TESTDIR = ./swh/tests DBNAME=swh-git-cloner SQL=./swh.sql SWH_CLONER=$(BINDIR)/swh-git-producer PROFILE_TYPE=profile FOLLOW_LOG=-f deps: sudo apt-get install -y python3 \ python3-pygit2 \ python3-psycopg2 \ python3-nose \ python3-celery \ celeryd \ ipython3 prepare: mkdir -p /tmp/swh-git-cloner/log clean: rm -rf /tmp/swh-git-cloner/log /tmp/swh-git-cloner inspect-active: celery inspect active inspect-queue: celery inspect active_queues inspect-stats: celery inspect stats run-1-worker: - celery worker --app=swh.worker.celery \ + celery worker --app=swh.cloner.git.worker.celery \ --pool=prefork \ --concurrency=8 \ -Ofair \ --loglevel=info 2>&1 | tee -a w1.log run-2-workers: celery multi start w1 w2 \ - --app=swh.worker.celery + --app=swh.cloner.git.worker.celery --pool=prefork \ -Ofair \ --concurrency=8 \ --loglevel=info \ stop-2-workers: celery multi stop w1 w2 create-db: dropdb --if-exists $(DBNAME) createdb $(DBNAME) psql $(DBNAME) < $(SQL) drop-db: dropdb --if-exists $(DBNAME) connect-db: psql $(DBNAME) run-producer: PYTHONPATH=`pwd` $(SWH_CLONER) $(FLAG) - -check: - $(FLAKE) $(BINDIR) $(SRCDIR) diff --git a/bin/swh-git-producer b/bin/swh-git-producer index 8d080a2..9524e74 100755 --- a/bin/swh-git-producer +++ b/bin/swh-git-producer @@ -1,70 +1,70 @@ #!/usr/bin/env python3 # Copyright (C) 2015 Stefano Zacchiroli , # Antoine R. Dumont # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import argparse import configparser import logging import os -from swh.producer import clones +from swh.cloner.git.producer import clones # Default configuration file DEFAULT_CONF_FILE = '~/.config/swh/clones-producer.ini' # default configuration (can be overriden by the DEFAULT_CONF_FILE) DEFAULT_CONF = { 'table_clones': 'sample', 'limit': None, 'offset': None, 'log_dir': 'swh-git-cloner/log/', 'db_url': 'dbname=github', 'repository_scheme': 'https://github.com/%s', 'debug': None } def parse_args(): """Parse the configuration for the cli. """ cli = argparse.ArgumentParser( description='Clone git repository on fs.') cli.add_argument('--verbose', '-v', action='store_true', help='Verbosity level in log file.') cli.add_argument('--config', '-c', help='configuration file path') args = cli.parse_args() return args def read_conf(args): """Read the user's configuration file. args contains the repo to parse. Transmit to the result. """ config = configparser.ConfigParser(defaults=DEFAULT_CONF) conf_file = args.config or DEFAULT_CONF_FILE config.read(os.path.expanduser(conf_file)) conf = config._sections['main'] # ensure the default keys are set if some are missing for key in DEFAULT_CONF: conf[key] = conf.get(key, DEFAULT_CONF[key]) return conf if __name__ == '__main__': args = parse_args() conf = read_conf(args) log_filename = os.path.join(conf['log_dir'], 'cloner.log') logging.basicConfig(filename=log_filename, level=logging.DEBUG if args.verbose else logging.INFO) clones.produce(conf) diff --git a/swh/cloner/__init__.py b/swh/cloner/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/swh/cloner/git/__init__.py b/swh/cloner/git/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/swh/file.py b/swh/cloner/git/file.py similarity index 98% rename from swh/file.py rename to swh/cloner/git/file.py index 369203b..75772ec 100644 --- a/swh/file.py +++ b/swh/cloner/git/file.py @@ -1,46 +1,46 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging import shutil import os -from swh import proc +from . import proc def touch(path): """Create an empty file. """ logging.info('Create temporary file %s' % path) open(path, 'a').close() def rsync(host, access_command, from_folder, to_folder): """Execute a rsync action from a source from to a to_folder over the host. """ return proc.execute(['rsync', '-az', '--delete', '-e', access_command, '--relative', from_folder, '%s:%s' % (host, to_folder)]) def rm(folder): """Remove a folder from the local filesystem. """ if os.path.exists(folder): logging.info('task cleanup %s' % folder) shutil.rmtree(folder) else: logging.info('Folder %s does not exist. Nothing to clean up.' % folder) def count_size_in_bytes(folder): """Compute the bytes size of a folder. """ return int(proc.execute(['du', '-bs', folder]).split()[0]) diff --git a/swh/git.py b/swh/cloner/git/git.py similarity index 97% rename from swh/git.py rename to swh/cloner/git/git.py index 31a1500..9a575e2 100755 --- a/swh/git.py +++ b/swh/cloner/git/git.py @@ -1,50 +1,49 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import logging -import requests -from swh import file, proc +from swh.cloner.git import file, proc def clone_or_nothing(repo_url, mount_repo_folder, repo_name, witness_filename): """Clone a git repo_name (user/repo) repository repo_url. Check if a witness file exists. If it exists, do nothing. Else, check the existence of the clone. If it exists, it's a partial clone so it's cleaned-up. Otherwise, clone the repository and create a witness file after that. This function returns a tuple, first the full path to the local clone and second, the git clone command output. """ full_repo_folder = os.path.join(mount_repo_folder, repo_name[0], repo_name) full_witness_filename = os.path.join(full_repo_folder, witness_filename) if os.path.isfile(full_witness_filename): logging.warn('Clone %s already present!' % repo_name) return full_repo_folder else: logging.info('Clone %s in %s' % (repo_url, full_repo_folder)) if os.path.isdir(full_repo_folder): # cleanup partial repository logging.info('Clean partial clone %s!' % repo_name) file.rm(full_repo_folder) stdout = basic_clone(repo_url, full_repo_folder) file.touch(full_witness_filename) return full_repo_folder, stdout def basic_clone(repo_url, repo_dest_folder): """Clone a repository url in the destination folder `repo_dest_folder`. Returns the output of the git clone command. """ return proc.execute(['git', 'clone', '--bare', '--quiet', repo_url, repo_dest_folder]) def fsck(repo_folder): """Compute a git fsck on a git repository. Return the output of the git fsck command. """ return proc.execute(['git', '--git-dir', repo_folder, 'fsck', '--full']) diff --git a/swh/proc.py b/swh/cloner/git/proc.py similarity index 100% rename from swh/proc.py rename to swh/cloner/git/proc.py diff --git a/swh/producer/__init__.py b/swh/cloner/git/producer/__init__.py similarity index 100% rename from swh/producer/__init__.py rename to swh/cloner/git/producer/__init__.py diff --git a/swh/producer/clones.py b/swh/cloner/git/producer/clones.py similarity index 92% rename from swh/producer/clones.py rename to swh/cloner/git/producer/clones.py index b2f4d5e..bbf82e3 100644 --- a/swh/producer/clones.py +++ b/swh/cloner/git/producer/clones.py @@ -1,31 +1,31 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging -from swh.storage import db, models -from swh.worker import tasks +from swh.cloner.git.storage import db, models +from swh.cloner.git.worker import tasks def produce(conf): """Make workers clone repositories. """ db_url = conf['db_url'] table_clones = conf['table_clones'] limit = conf['limit'] offset = conf['offset'] repository_scheme = conf['repository_scheme'] with db.connect(db_url) as db_conn: repos = models.load_random_sample(db_conn, table_clones, limit, offset) for (repo_id, repo_name) in repos: logging.info('load repo %s into queue' % repo_name) repo_url = repository_scheme % repo_name model_data = {'repo_id': repo_id, 'repo_url': repo_url, 'repo_name': repo_name} tasks.orchestrate_clone_with_measure.delay(model_data) diff --git a/swh/storage/__init__.py b/swh/cloner/git/storage/__init__.py similarity index 100% rename from swh/storage/__init__.py rename to swh/cloner/git/storage/__init__.py diff --git a/swh/storage/db.py b/swh/cloner/git/storage/db.py similarity index 100% rename from swh/storage/db.py rename to swh/cloner/git/storage/db.py diff --git a/swh/storage/models.py b/swh/cloner/git/storage/models.py similarity index 98% rename from swh/storage/models.py rename to swh/cloner/git/storage/models.py index 278a5eb..e30cffa 100644 --- a/swh/storage/models.py +++ b/swh/cloner/git/storage/models.py @@ -1,62 +1,62 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from swh.storage import db +from swh.cloner.git.storage import db def load_repos(db_conn, limit=10): """List the repository. Limit the number of repository to load. 10 is the default if no limit is provided. NOT USED YET?! """ return db.query_fetch(db_conn, ("""SELECT id, full_name FROM orig_repos LIMIT %s""", (limit,))) def load_random_1_percent_repos(db_conn): """Load the 1 percent repositories at random. NOT USED YET?! """ return db.query_fetch(db_conn, """SELECT id, full_name FROM repos_random_sample(1) UNION ALL SELECT id, full_name, html_url FROM repos_well_known();""") def load_random_sample(db_conn, table='sample', limit=None, offset=None): """Load the table sample containing random sample of repositories to fetch. """ if limit: query_limit = ' order by id limit ' + limit query_limit += '' if not offset else ' offset ' + offset else: query_limit = '' query = 'SELECT id, full_name FROM ' + table + query_limit return db.query_fetch(db_conn, query) def persist_task_result(db_conn, repo_id, task_id, task_start_date, task_duration, status, json_result, stdout, stderr): """Persist the task's result. """ return db.query_execute(db_conn, ("""INSERT INTO crawl_history (repo, task_id, date, duration, status, result, stdout, stderr) VALUES(%s, %s, %s, %s, %s, %s, %s, %s) """, (repo_id, task_id, task_start_date, task_duration, status, json_result, stdout, stderr))) diff --git a/swh/worker/__init__.py b/swh/cloner/git/worker/__init__.py similarity index 100% rename from swh/worker/__init__.py rename to swh/cloner/git/worker/__init__.py diff --git a/swh/worker/celery.py b/swh/cloner/git/worker/celery.py similarity index 96% rename from swh/worker/celery.py rename to swh/cloner/git/worker/celery.py index 62521cf..f41b09e 100644 --- a/swh/worker/celery.py +++ b/swh/cloner/git/worker/celery.py @@ -1,72 +1,70 @@ # coding: utf-8 # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from __future__ import absolute_import - from celery import Celery from . import conf -app = Celery('swh', - include=['swh.worker.tasks']) +app = Celery('swh.cloner.git', + include=['swh.cloner.git.worker.tasks']) CONF = conf.read_conf() # source: http://celery.readthedocs.org/en/latest/configuration.html # Optional configuration, see the application user guide. app.conf.update( # Default broker URL. This must be an URL in the form of: # transport://userid:password@hostname:port/virtual_host # Only the scheme part (transport://) is required, the rest is optional, and # defaults to the specific transports default values. # The transport part is the broker implementation to use, and the default is # amqp, which uses librabbitmq by default or falls back to pyamqp if that is # not installed. Also there are many other choices including redis, beanstalk, # sqlalchemy, django, mongodb, couchdb. It can also be a fully qualified path # to your own transport implementation. BROKER_URL = CONF['queue_url'], # 'amqp://guest:guest@localhost:5672//', # Time (in seconds, or a timedelta object) for when after stored task # tombstones will be deleted. CELERY_TASK_RESULT_EXPIRES=3600, # Late ack means the task messages will be acknowledged after the task has # been executed, not just before, which is the default behavior. CELERY_ACKS_LATE=True, # The backend used to store task results (tombstones). # Disabled by default. Can be one of the rpc, amqp, postgres, redit, cache, # mongodb, cassandra, etc... # CELERY_RESULT_BACKEND='rpc://', # CELERY_RESULT_BACKEND='db+postgresql://scott:tiger@localhost/mydatabase', # CELERY_RESULT_PERSISTENT=True, # A string identifying the default serialization method to use. # Can be pickle (default), json, yaml, msgpack or any custom serialization # methods that have been registered with kombu.serialization.registry CELERY_ACCEPT_CONTENT=['pickle', 'json'], # If True the task will report its status as “started” # when the task is executed by a worker. CELERY_TRACK_STARTED=True, # Default compression used for task messages. Can be gzip, bzip2 # (if available), or any custom compression schemes registered # in the Kombu compression registry. # CELERY_MESSAGE_COMPRESSION='bzip2', # Disable all rate limits, even if tasks has explicit rate limits set. # (Disabling rate limits altogether is recommended if you don’t have any # tasks using them.) CELERY_DISABLE_RATE_LIMITS=True, # Task hard time limit in seconds. The worker processing the task will be # killed and replaced with a new one when this is exceeded. # CELERYD_TASK_TIME_LIMIT=3600, # Task soft time limit in seconds. # The SoftTimeLimitExceeded exception will be raised when this is exceeded. # The task can catch this to e.g. clean up before the hard time limit comes. CELERYD_TASK_SOFT_TIME_LIMIT=CONF['task_soft_time_limit'] ) if __name__ == '__main__': app.start() diff --git a/swh/worker/conf.py b/swh/cloner/git/worker/conf.py similarity index 100% rename from swh/worker/conf.py rename to swh/cloner/git/worker/conf.py diff --git a/swh/worker/tasks.py b/swh/cloner/git/worker/tasks.py similarity index 98% rename from swh/worker/tasks.py rename to swh/cloner/git/worker/tasks.py index 5bbdfa5..0b3061b 100644 --- a/swh/worker/tasks.py +++ b/swh/cloner/git/worker/tasks.py @@ -1,198 +1,198 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import resource import os import json from datetime import datetime from celery.utils import log from celery.exceptions import SoftTimeLimitExceeded from .celery import app, conf -from swh import git, file -from swh.storage import db, models +from swh.cloner.git import git, file +from swh.cloner.git.storage import db, models logger = log.get_task_logger(__name__) MAIN_CONF = conf.read_conf() FSIZE_LIMIT = int(MAIN_CONF['clone_limit_size']) # FIXME: for json's limitation which do not know how to serialize timedelta # object (cf. json encoder) def execute_with_measure(key_measure, fn, data, timedelta=False): """Execute fn with `data` as parameter and measure its `wall clock time`. - Result from such computation is then returned with this time measured indexed at key `key_measure` - timedelta determined the format of such time, if false (the default) stored in seconds, otherwise in python's timedelta. """ try: start, data, end = datetime.utcnow(), fn(data), datetime.utcnow() diff_time = end - start data.update({key_measure: diff_time.total_seconds() if timedelta is False else diff_time}) return data except SoftTimeLimitExceeded: logger.error('Soft time limit exceeded. Clean up.') if 'clone_repo' in data: cleanup(data) raise SoftTimeLimitExceeded # raise it again for the error callback def filter_keys(data_keys, result): """Filter no longer needed data_keys in result (if they are present). """ for data_key in data_keys: if data_key in result: result.pop(data_key) return result def report_task_summary(task_id, status, result): """Persist the task's result. """ # retrieve the datetime and remove it from data to persist in `result` json task_start_date = result.pop('task_start_date') # task_duration could be non-existent due to runtime failure task_duration = \ result.pop('task_duration') if 'task_duration' in result else None result_stdout = result.pop('stdout', '') if 'stdout' in result else '' result_stderr = result.pop('stderr', '') if 'stderr' in result else '' result = filter_keys(['source_folder', 'clone_repo', 'repo_name', 'repo_url'], result) repo_id = result.pop('repo_id') json_result = json.dumps(result) return repo_id, task_id, task_start_date, task_duration, status, \ json_result, result_stdout, result_stderr def persist(task_id, status, result): """Persist the task_id's result with status. """ # FIXME Deal with potential db problem... with db.connect(MAIN_CONF['db_url']) as db_conn: res = report_task_summary(task_id, status=status, result=result) models.persist_task_result(db_conn, *res) def on_success_cb(task, retval, task_id, args, kwargs): """Handler function called when a task is in 'SUCCESS' status! """ persist(task_id, status=True, result=retval) def on_failure_cb(task, exc, task_id, args, kwargs, einfo): """Handler function when a task is done! """ result, = args # potentially, errors occurred before the cleanup if 'clone_repo' in result: cleanup(result) result.update({'error_message': str(exc), 'stderr': str(einfo)}) persist(task_id, status=False, result=result) @app.task(on_success=on_success_cb, on_failure=on_failure_cb) def orchestrate_clone_with_measure(model_data): """Orchestrate the cloning of a repository. """ model_data.update({'task_start_date': datetime.utcnow()}) return execute_with_measure('task_duration', orchestrate_clone, model_data, timedelta=True) def orchestrate_clone(model_data): """Clone, fsck, du, rsync, cleanup a repository. """ model_data = execute_with_measure('time_clone', clone, model_data) if 'clone_repo' not in model_data: logger.info(model_data) return model_data model_data = size_as_bytes(model_data) logger.info(model_data) model_data = execute_with_measure('time_rsync', rsync, model_data) if 'time_rsync' not in model_data: logger.info(model_data) return model_data model_data = execute_with_measure('time_cleanup', cleanup, model_data) return model_data def clone(model_data): """Clone the repository. Error can be raised if the repository exceeds accepted limits. """ resource.setrlimit(resource.RLIMIT_FSIZE, (FSIZE_LIMIT, FSIZE_LIMIT)) source_folder = MAIN_CONF['mount'] clone_repo, stdout = git.clone_or_nothing(model_data['repo_url'], source_folder, model_data['repo_name'], MAIN_CONF['witness-file-name']) model_data.update({'source_folder': source_folder, 'clone_repo': clone_repo, 'stdout': stdout}) return model_data def fsck(model_data): """Execute a git fsck on a repository folder. """ stdout = git.fsck(model_data['clone_repo']) model_data['stdout'] = model_data.get('stdout', '') + stdout return model_data def rsync(model_data): """Execute rsync to the host from the current machine the source_folder/repo_name. repo_name is expected to be of form / """ prepared_source_folder = os.path.join(model_data['source_folder'], './', model_data['repo_name'][0], model_data['repo_name']) stdout = file.rsync(MAIN_CONF['ssh-host'], MAIN_CONF['ssh-access-command'], prepared_source_folder, MAIN_CONF.get('ssh-host-destination-folder', '')) model_data['stdout'] = model_data.get('stdout', '') + stdout return model_data def cleanup(model_data): """Remove the source_folder directory. """ file.rm(model_data['clone_repo']) return model_data def size_as_bytes(model_data): """Count the repository's size on disk. """ clone_repo = model_data['clone_repo'] model_data.update( {'clone_repo_size': file.count_size_in_bytes(clone_repo)}) return model_data