diff --git a/swh/__init__.py b/swh/__init__.py index 6020b8c..fdffa2a 100644 --- a/swh/__init__.py +++ b/swh/__init__.py @@ -1,7 +1 @@ -# Copyright (C) 2015 Stefano Zacchiroli , -# Antoine R. Dumont -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - # placeholder diff --git a/swh/file.py b/swh/file.py index 982868f..369203b 100644 --- a/swh/file.py +++ b/swh/file.py @@ -1,47 +1,46 @@ -# Copyright (C) 2015 Stefano Zacchiroli , -# Antoine R. Dumont +# Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging import shutil import os from swh import proc def touch(path): """Create an empty file. """ logging.info('Create temporary file %s' % path) open(path, 'a').close() def rsync(host, access_command, from_folder, to_folder): """Execute a rsync action from a source from to a to_folder over the host. """ return proc.execute(['rsync', '-az', '--delete', '-e', access_command, '--relative', from_folder, '%s:%s' % (host, to_folder)]) def rm(folder): """Remove a folder from the local filesystem. """ if os.path.exists(folder): logging.info('task cleanup %s' % folder) shutil.rmtree(folder) else: logging.info('Folder %s does not exist. Nothing to clean up.' % folder) def count_size_in_bytes(folder): """Compute the bytes size of a folder. """ return int(proc.execute(['du', '-bs', folder]).split()[0]) diff --git a/swh/git.py b/swh/git.py index 41194ce..31a1500 100755 --- a/swh/git.py +++ b/swh/git.py @@ -1,51 +1,50 @@ -# Copyright (C) 2015 Stefano Zacchiroli , -# Antoine R. Dumont +# Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import logging import requests from swh import file, proc def clone_or_nothing(repo_url, mount_repo_folder, repo_name, witness_filename): """Clone a git repo_name (user/repo) repository repo_url. Check if a witness file exists. If it exists, do nothing. Else, check the existence of the clone. If it exists, it's a partial clone so it's cleaned-up. Otherwise, clone the repository and create a witness file after that. This function returns a tuple, first the full path to the local clone and second, the git clone command output. """ full_repo_folder = os.path.join(mount_repo_folder, repo_name[0], repo_name) full_witness_filename = os.path.join(full_repo_folder, witness_filename) if os.path.isfile(full_witness_filename): logging.warn('Clone %s already present!' % repo_name) return full_repo_folder else: logging.info('Clone %s in %s' % (repo_url, full_repo_folder)) if os.path.isdir(full_repo_folder): # cleanup partial repository logging.info('Clean partial clone %s!' % repo_name) file.rm(full_repo_folder) stdout = basic_clone(repo_url, full_repo_folder) file.touch(full_witness_filename) return full_repo_folder, stdout def basic_clone(repo_url, repo_dest_folder): """Clone a repository url in the destination folder `repo_dest_folder`. Returns the output of the git clone command. """ return proc.execute(['git', 'clone', '--bare', '--quiet', repo_url, repo_dest_folder]) def fsck(repo_folder): """Compute a git fsck on a git repository. Return the output of the git fsck command. """ return proc.execute(['git', '--git-dir', repo_folder, 'fsck', '--full']) diff --git a/swh/proc.py b/swh/proc.py index 5199ef3..31d54ae 100644 --- a/swh/proc.py +++ b/swh/proc.py @@ -1,13 +1,12 @@ -# Copyright (C) 2015 Stefano Zacchiroli , -# Antoine R. Dumont +# Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import subprocess def execute(command): """Execute the command and return its output. """ return subprocess.check_output(command, universal_newlines=True) diff --git a/swh/producer/__init__.py b/swh/producer/__init__.py index 6020b8c..fdffa2a 100644 --- a/swh/producer/__init__.py +++ b/swh/producer/__init__.py @@ -1,7 +1 @@ -# Copyright (C) 2015 Stefano Zacchiroli , -# Antoine R. Dumont -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - # placeholder diff --git a/swh/producer/clones.py b/swh/producer/clones.py index 9dddec6..b2f4d5e 100644 --- a/swh/producer/clones.py +++ b/swh/producer/clones.py @@ -1,32 +1,31 @@ -# Copyright (C) 2015 Stefano Zacchiroli , -# Antoine R. Dumont +# Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging from swh.storage import db, models from swh.worker import tasks def produce(conf): """Make workers clone repositories. """ db_url = conf['db_url'] table_clones = conf['table_clones'] limit = conf['limit'] offset = conf['offset'] repository_scheme = conf['repository_scheme'] with db.connect(db_url) as db_conn: repos = models.load_random_sample(db_conn, table_clones, limit, offset) for (repo_id, repo_name) in repos: logging.info('load repo %s into queue' % repo_name) repo_url = repository_scheme % repo_name model_data = {'repo_id': repo_id, 'repo_url': repo_url, 'repo_name': repo_name} tasks.orchestrate_clone_with_measure.delay(model_data) diff --git a/swh/storage/__init__.py b/swh/storage/__init__.py index 6020b8c..fdffa2a 100644 --- a/swh/storage/__init__.py +++ b/swh/storage/__init__.py @@ -1,7 +1 @@ -# Copyright (C) 2015 Stefano Zacchiroli , -# Antoine R. Dumont -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - # placeholder diff --git a/swh/storage/db.py b/swh/storage/db.py index 2a280e2..c933d55 100644 --- a/swh/storage/db.py +++ b/swh/storage/db.py @@ -1,78 +1,77 @@ -# Copyright (C) 2015 Stefano Zacchiroli , -# Antoine R. Dumont +# Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import psycopg2 def connect(db_url): """Open db connection. """ return psycopg2.connect(db_url) def execute(cur, query_params, trace=None): """Execute the query_params. query_params is expected to be either: - a sql query (string) - a tuple (sql query, params) """ if isinstance(query_params, str): cur.execute(query_params) else: if trace is not None: print("mogrify: ", cur.mogrify(*query_params).decode()) cur.execute(*query_params) def copy_from(cur, file, table): """Copy the content of a file to the db in the table table. """ cur.copy_from(file, table) def query_execute(db_conn, query_params): """Execute one query. Type of sql queries: insert, delete, drop, create... query_params is expected to be either: - a sql query (string) - a tuple (sql query, params) """ with db_conn.cursor() as cur: execute(cur, query_params) def queries_execute(db_conn, queries_params, trace=None): """Execute multiple queries without any result expected. Type of sql queries: insert, delete, drop, create... query_params is expected to be a list of mixed: - sql query (string) - tuple (sql query, params) """ with db_conn.cursor() as cur: for query_params in queries_params: execute(cur, query_params, trace) def query_fetchone(db_conn, query_params): """Execute sql query which returns one result. query_params is expected to be either: - a sql query (string) - a tuple (sql query, params) """ with db_conn.cursor() as cur: execute(cur, query_params) return cur.fetchone() def query_fetch(db_conn, query_params, trace=None): """Execute sql query which returns results. query_params is expected to be either: - a sql query (string) - a tuple (sql query, params) """ with db_conn.cursor() as cur: execute(cur, query_params, trace) return cur.fetchall() diff --git a/swh/storage/models.py b/swh/storage/models.py index b8314fd..278a5eb 100644 --- a/swh/storage/models.py +++ b/swh/storage/models.py @@ -1,63 +1,62 @@ -# Copyright (C) 2015 Stefano Zacchiroli , -# Antoine R. Dumont +# Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from swh.storage import db def load_repos(db_conn, limit=10): """List the repository. Limit the number of repository to load. 10 is the default if no limit is provided. NOT USED YET?! """ return db.query_fetch(db_conn, ("""SELECT id, full_name FROM orig_repos LIMIT %s""", (limit,))) def load_random_1_percent_repos(db_conn): """Load the 1 percent repositories at random. NOT USED YET?! """ return db.query_fetch(db_conn, """SELECT id, full_name FROM repos_random_sample(1) UNION ALL SELECT id, full_name, html_url FROM repos_well_known();""") def load_random_sample(db_conn, table='sample', limit=None, offset=None): """Load the table sample containing random sample of repositories to fetch. """ if limit: query_limit = ' order by id limit ' + limit query_limit += '' if not offset else ' offset ' + offset else: query_limit = '' query = 'SELECT id, full_name FROM ' + table + query_limit return db.query_fetch(db_conn, query) def persist_task_result(db_conn, repo_id, task_id, task_start_date, task_duration, status, json_result, stdout, stderr): """Persist the task's result. """ return db.query_execute(db_conn, ("""INSERT INTO crawl_history (repo, task_id, date, duration, status, result, stdout, stderr) VALUES(%s, %s, %s, %s, %s, %s, %s, %s) """, (repo_id, task_id, task_start_date, task_duration, status, json_result, stdout, stderr))) diff --git a/swh/worker/__init__.py b/swh/worker/__init__.py index 6020b8c..fdffa2a 100644 --- a/swh/worker/__init__.py +++ b/swh/worker/__init__.py @@ -1,7 +1 @@ -# Copyright (C) 2015 Stefano Zacchiroli , -# Antoine R. Dumont -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - # placeholder diff --git a/swh/worker/celery.py b/swh/worker/celery.py index 9e4a1e7..62521cf 100644 --- a/swh/worker/celery.py +++ b/swh/worker/celery.py @@ -1,73 +1,72 @@ # coding: utf-8 -# Copyright (C) 2015 Stefano Zacchiroli , -# Antoine R. Dumont +# Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from __future__ import absolute_import from celery import Celery from . import conf app = Celery('swh', include=['swh.worker.tasks']) CONF = conf.read_conf() # source: http://celery.readthedocs.org/en/latest/configuration.html # Optional configuration, see the application user guide. app.conf.update( # Default broker URL. This must be an URL in the form of: # transport://userid:password@hostname:port/virtual_host # Only the scheme part (transport://) is required, the rest is optional, and # defaults to the specific transports default values. # The transport part is the broker implementation to use, and the default is # amqp, which uses librabbitmq by default or falls back to pyamqp if that is # not installed. Also there are many other choices including redis, beanstalk, # sqlalchemy, django, mongodb, couchdb. It can also be a fully qualified path # to your own transport implementation. BROKER_URL = CONF['queue_url'], # 'amqp://guest:guest@localhost:5672//', # Time (in seconds, or a timedelta object) for when after stored task # tombstones will be deleted. CELERY_TASK_RESULT_EXPIRES=3600, # Late ack means the task messages will be acknowledged after the task has # been executed, not just before, which is the default behavior. CELERY_ACKS_LATE=True, # The backend used to store task results (tombstones). # Disabled by default. Can be one of the rpc, amqp, postgres, redit, cache, # mongodb, cassandra, etc... # CELERY_RESULT_BACKEND='rpc://', # CELERY_RESULT_BACKEND='db+postgresql://scott:tiger@localhost/mydatabase', # CELERY_RESULT_PERSISTENT=True, # A string identifying the default serialization method to use. # Can be pickle (default), json, yaml, msgpack or any custom serialization # methods that have been registered with kombu.serialization.registry CELERY_ACCEPT_CONTENT=['pickle', 'json'], # If True the task will report its status as “started” # when the task is executed by a worker. CELERY_TRACK_STARTED=True, # Default compression used for task messages. Can be gzip, bzip2 # (if available), or any custom compression schemes registered # in the Kombu compression registry. # CELERY_MESSAGE_COMPRESSION='bzip2', # Disable all rate limits, even if tasks has explicit rate limits set. # (Disabling rate limits altogether is recommended if you don’t have any # tasks using them.) CELERY_DISABLE_RATE_LIMITS=True, # Task hard time limit in seconds. The worker processing the task will be # killed and replaced with a new one when this is exceeded. # CELERYD_TASK_TIME_LIMIT=3600, # Task soft time limit in seconds. # The SoftTimeLimitExceeded exception will be raised when this is exceeded. # The task can catch this to e.g. clean up before the hard time limit comes. CELERYD_TASK_SOFT_TIME_LIMIT=CONF['task_soft_time_limit'] ) if __name__ == '__main__': app.start() diff --git a/swh/worker/conf.py b/swh/worker/conf.py index 452d766..a4ef1af 100644 --- a/swh/worker/conf.py +++ b/swh/worker/conf.py @@ -1,53 +1,52 @@ -# Copyright (C) 2015 Stefano Zacchiroli , -# Antoine R. Dumont +# Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import configparser import os # Default configuration file DEFAULT_CONF_FILE = '~/.config/swh/clones-worker.ini' # default configuration (can be overriden by the DEFAULT_CONF_FILE) DEFAULT_CONF = { # mount folder where to dump the clones 'mount': '/tmp/swh-git-cloner/', # witness file built after the cloning. It's a witness of a success clone 'witness-file-name': 'witness', # the ssh access key to the ssh-host 'ssh-access-command': 'ssh -i ~/.ssh/inria/id-rsa-swh-rsync -l swh', # ssh host 'ssh-host': 'tytso.inria.fr', # destination folder on host 'ssh-host-destination-folder': '', # url access to db 'db_url': 'host= port= dbname= user= password=', # max file size limit allowed to git clone, in bytes 'clone_limit_size': '4294967296', # 4 GB # the queue url to access for consuming tasks 'queue_url': 'amqp://guest:guest@localhost:5672//', # soft time limit for a task, if exceeded, the worker cleans up # and stops 'task_soft_time_limit': '3600', } def read_conf(conf_file=DEFAULT_CONF_FILE): """Read the user's configuration file. args contains the repo to parse. Transmit to the result. """ config = configparser.ConfigParser(defaults=DEFAULT_CONF) config.read(os.path.expanduser(conf_file)) conf = config._sections['main'] # ensure the default keys are set if some are missing for key in DEFAULT_CONF: conf[key] = conf.get(key, DEFAULT_CONF[key]) return conf diff --git a/swh/worker/tasks.py b/swh/worker/tasks.py index 6835a26..f1d1b9b 100644 --- a/swh/worker/tasks.py +++ b/swh/worker/tasks.py @@ -1,198 +1,197 @@ -# Copyright (C) 2015 Stefano Zacchiroli , -# Antoine R. Dumont +# Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import resource import os import json from datetime import datetime from celery.utils import log from celery.exceptions import SoftTimeLimitExceeded from .celery import app, conf from swh import git, file from swh.storage import db, models logger = log.get_task_logger(__name__) MAIN_CONF = conf.read_conf() FSIZE_LIMIT = int(MAIN_CONF['clone_limit_size']) # FIXME: for json's limitation which do not know how to serialize timedelta # object (cf. json encoder) def execute_with_measure(key_measure, fn, data, timedelta=False): """Execute fn with `data` as parameter and measure its `wall clock time`. - Result from such computation is then returned with this time measured indexed at key `key_measure` - timedelta determined the format of such time, if false (the default) stored in seconds, otherwise in python's timedelta. """ try: start, data, end = datetime.utcnow(), fn(data), datetime.utcnow() diff_time = end - start data.update({key_measure: diff_time.total_seconds() if timedelta is False else diff_time}) return data except SoftTimeLimitExceeded: logger.error('Soft time limit exceeded. Clean up.') if 'clone_repo' in data: cleanup(data) raise SoftTimeLimitExceeded # raise it again for the error callback def filter_keys(data_keys, result): """Filter no longer needed data_keys in result (if they are present). """ for data_key in data_keys: if data_key in result: result.pop(data_key) return result def report_task_summary(task_id, status, result): """Persist the task's result. """ # retrieve the datetime and remove it from data to persist in `result` json task_start_date = result.pop('task_start_date') # task_duration could be non-existent due to runtime failure task_duration = \ result.pop('task_duration') if 'task_duration' in result else None result_stdout = result.pop('stdout', '') if 'stdout' in result else '' result_stderr = result.pop('stderr', '') if 'stderr' in result else '' result = filter_keys(['source_folder', 'clone_repo', 'repo_name', 'repo_url'], result) repo_id = result.pop('repo_id') json_result = json.dumps(result) return repo_id, task_id, task_start_date, task_duration, status, \ json_result, result_stdout, result_stderr def persist(task_id, status, result): """Persist the task_id's result with status. """ with db.connect(MAIN_CONF['db_url']) as db_conn: res = report_task_summary(task_id, status=status, result=result) models.persist_task_result(db_conn, *res) def on_success_cb(task, retval, task_id, args, kwargs): """Handler function called when a task is in 'SUCCESS' status! """ persist(task_id, status=True, result=retval) def on_failure_cb(task, exc, task_id, args, kwargs, einfo): """Handler function when a task is done! """ result, = args # potentially, errors occurred before the cleanup if 'clone_repo' in result: cleanup(result) result.update({'error_message': str(exc), 'stderr': str(einfo)}) persist(task_id, status=False, result=result) @app.task(on_success=on_success_cb, on_failure=on_failure_cb) def orchestrate_clone_with_measure(model_data): """Orchestrate the cloning of a repository. """ model_data.update({'task_start_date': datetime.utcnow()}) return execute_with_measure('task_duration', orchestrate_clone, model_data, timedelta=True) def orchestrate_clone(model_data): """Clone, fsck, du, rsync, cleanup a repository. """ model_data = execute_with_measure('time_clone', clone, model_data) if 'clone_repo' not in model_data: logger.info(model_data) return model_data model_data = size_as_bytes(model_data) logger.info(model_data) model_data = execute_with_measure('time_rsync', rsync, model_data) if 'time_rsync' not in model_data: logger.info(model_data) return model_data model_data = execute_with_measure('time_cleanup', cleanup, model_data) return model_data def clone(model_data): """Clone the repository. Error can be raised if the repository exceeds accepted limits. """ resource.setrlimit(resource.RLIMIT_FSIZE, (FSIZE_LIMIT, FSIZE_LIMIT)) source_folder = MAIN_CONF['mount'] clone_repo, stdout = git.clone_or_nothing(model_data['repo_url'], source_folder, model_data['repo_name'], MAIN_CONF['witness-file-name']) model_data.update({'source_folder': source_folder, 'clone_repo': clone_repo, 'stdout': stdout}) return model_data def fsck(model_data): """Execute a git fsck on a repository folder. """ stdout = git.fsck(model_data['clone_repo']) model_data['stdout'] = model_data.get('stdout', '') + stdout return model_data def rsync(model_data): """Execute rsync to the host from the current machine the source_folder/repo_name. repo_name is expected to be of form / """ prepared_source_folder = os.path.join(model_data['source_folder'], './', model_data['repo_name'][0], model_data['repo_name']) stdout = file.rsync(MAIN_CONF['ssh-host'], MAIN_CONF['ssh-access-command'], prepared_source_folder, MAIN_CONF.get('ssh-host-destination-folder', '')) model_data['stdout'] = model_data.get('stdout', '') + stdout return model_data def cleanup(model_data): """Remove the source_folder directory. """ file.rm(model_data['clone_repo']) return model_data def size_as_bytes(model_data): """Count the repository's size on disk. """ clone_repo = model_data['clone_repo'] model_data.update( {'clone_repo_size': file.count_size_in_bytes(clone_repo)}) return model_data