diff --git a/PKG-INFO b/PKG-INFO index 4523f47..f71f216 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,10 +1,10 @@ Metadata-Version: 1.0 Name: swh.core -Version: 0.0.8 +Version: 0.0.9 Summary: Software Heritage core utilities Home-page: https://forge.softwareheritage.org/diffusion/DCORE/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Description: UNKNOWN Platform: UNKNOWN diff --git a/swh.core.egg-info/PKG-INFO b/swh.core.egg-info/PKG-INFO index 4523f47..f71f216 100644 --- a/swh.core.egg-info/PKG-INFO +++ b/swh.core.egg-info/PKG-INFO @@ -1,10 +1,10 @@ Metadata-Version: 1.0 Name: swh.core -Version: 0.0.8 +Version: 0.0.9 Summary: Software Heritage core utilities Home-page: https://forge.softwareheritage.org/diffusion/DCORE/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Description: UNKNOWN Platform: UNKNOWN diff --git a/swh/core/hashutil.py b/swh/core/hashutil.py index ff0679c..9a4ee54 100644 --- a/swh/core/hashutil.py +++ b/swh/core/hashutil.py @@ -1,106 +1,131 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import binascii import functools import hashlib import os from io import BytesIO # supported hashing algorithms ALGORITHMS = set(['sha1', 'sha256', 'sha1_git']) +# Default algorithms when not mentioned +KNOWN_ALGORITHMS = ALGORITHMS | set(['sha1_blob_git', 'sha1_tree_git', + 'sha1_commit_git']) + # should be a multiple of 64 (sha1/sha256's block size) # FWIW coreutils' sha1sum uses 32768 HASH_BLOCK_SIZE = 32768 def _new_hash(algo, length=None): """Initialize a digest object (as returned by python's hashlib) for the requested algorithm. See the constant ALGORITHMS for the list of supported algorithms. If a git-specific hashing algorithm is requested (e.g., - "sha1_git"), the hashing object will be pre-fed with the needed header; for + "sha1_git", "sha1_blob_git", "sha1_tree_git", "sha1_commit_git"), the + hashing object will be pre-fed with the needed header; for this to work, length must be given. + Args: + algo: List of algorithms in ALGORITHMS + length: Length of content to hash. Could be None if when hashing + with sha1 and sha256 + + Returns: + A digest object + + Raises: + ValueError when on sha1_*git algorithms with length to None + ValueError when sha1_*git with * not in ('blob', 'commit', 'tree') + """ - if algo not in ALGORITHMS: + if algo not in KNOWN_ALGORITHMS: raise ValueError('unknown hashing algorithm ' + algo) h = None if algo.endswith('_git'): if length is None: raise ValueError('missing length for git hashing algorithm') - h = hashlib.new(algo.split('_')[0]) - h.update(('blob %d\0' % length).encode('ascii')) # git hash header + + algo_hash = algo.split('_') + h = hashlib.new(algo_hash[0]) + obj_type = 'blob' if algo_hash[1] == 'git' else algo_hash[1] + if obj_type not in ('blob', 'commit', 'tree'): + raise ValueError( + 'For `a la git` sha1 computation, the only supported types are' + ' blob, commit, tree') + + h.update(('%s %d\0' % (obj_type, length)).encode('ascii')) # git hash header else: h = hashlib.new(algo) return h def _hash_file_obj(f, length, algorithms=ALGORITHMS, chunk_cb=None): """hash the content of a file-like object If chunk_cb is given, call it on each data chunk after updating the hash """ hashers = {algo: _new_hash(algo, length) for algo in algorithms} while True: chunk = f.read(HASH_BLOCK_SIZE) if not chunk: break for h in hashers.values(): h.update(chunk) if chunk_cb: chunk_cb(chunk) return {algo: hashers[algo].digest() for algo in hashers} def _hash_fname(fname, algorithms=ALGORITHMS): """hash the content of a file specified by file name """ length = os.path.getsize(fname) with open(fname, 'rb') as f: - return _hash_file_obj(f, length) + return _hash_file_obj(f, length, algorithms) def hashfile(f, length=None, algorithms=ALGORITHMS): """Hash the content of a given file, given either as a file-like object or a file name. All specified hash algorithms will be computed, reading the file only once. Returns a dictionary mapping algorithm names to hex-encoded checksums. When passing a file-like object, content length must be given; when passing a file name, content length is ignored. """ if isinstance(f, str): return _hash_fname(f, algorithms) else: return _hash_file_obj(f, length, algorithms) def hashdata(data, algorithms=ALGORITHMS): """Like hashfile, but hashes content passed as a string (of bytes) """ buf = BytesIO(data) return _hash_file_obj(buf, len(data), algorithms) @functools.lru_cache() def hash_to_hex(hash): """Converts a hash to its hexadecimal string representation""" return binascii.hexlify(hash).decode('ascii') @functools.lru_cache() def hex_to_hash(hex): """Converts a hexadecimal string representation of a hash to that hash""" return bytes.fromhex(hex) diff --git a/swh/core/logger.py b/swh/core/logger.py index 0720859..940d868 100644 --- a/swh/core/logger.py +++ b/swh/core/logger.py @@ -1,106 +1,87 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging import os import psycopg2 import socket from psycopg2.extras import Json EXTRA_LOGDATA_PREFIX = 'swh_' def db_level_of_py_level(lvl): """convert a log level of the logging module to a log level suitable for the logging Postgres DB """ return logging.getLevelName(lvl).lower() class PostgresHandler(logging.Handler): """log handler that store messages in a Postgres DB See swh-core/sql/log-schema.sql for the DB schema. All logging methods can be used as usual. Additionally, arbitrary metadata can be passed to logging methods, requesting that they will be stored in the DB as a single JSONB value. To do so, pass a dictionary to the 'extra' kwarg of any logging method; all keys in that dictionary that start with EXTRA_LOGDATA_PREFIX (currently: 'swh_') will be extracted to form the JSONB dictionary. The prefix will be stripped and not included in the DB. Note: the logger name will be used to fill the 'module' DB column. Sample usage: logging.basicConfig(level=logging.INFO) h = PostgresHandler('dbname=softwareheritage-log') logging.getLogger().addHandler(h) logger.info('not so important notice', extra={'swh_type': 'swh_logging_test', 'swh_meditation': 'guru'}) logger.warn('something weird just happened, did you see that?') """ def __init__(self, connstring): """ Create a Postgres log handler. Args: config: configuration dictionary, with a key "log_db" containing a libpq connection string to the log DB """ super().__init__() self.connstring = connstring - # Attributes for pid-safe psycopg2 connection handling - self.__conn = None - self.__conn_pid = None - self.fqdn = socket.getfqdn() # cache FQDN value def _connect(self): return psycopg2.connect(self.connstring) - @property - def conn(self): - mypid = os.getpid() - # Reconnect if we changed pid or the connection is broken - if not self.__conn or self.__conn_pid != mypid or self.__conn.closed: - self.__conn = self._connect() - self.__conn_pid = mypid - - return self.__conn - - def close(self): - # Only close the connection if we created it - if self.__conn and self.__conn_pid == os.getpid(): - self.__conn.close() - super().close() - def emit(self, record): log_data = record.__dict__ msg = self.format(record) extra_data = {k[len(EXTRA_LOGDATA_PREFIX):]: v for k, v in log_data.items() if k.startswith(EXTRA_LOGDATA_PREFIX)} log_entry = (db_level_of_py_level(log_data['levelno']), msg, Json(extra_data), log_data['name'], self.fqdn, os.getpid()) - - with self.conn.cursor() as cur: + db = self._connect() + with db.cursor() as cur: cur.execute('INSERT INTO log ' '(level, message, data, src_module, src_host, src_pid)' 'VALUES (%s, %s, %s, %s, %s, %s)', log_entry) - self.conn.commit() + db.commit() + db.close() diff --git a/swh/core/tests/test_hashutil.py b/swh/core/tests/test_hashutil.py index 0931019..8b482db 100644 --- a/swh/core/tests/test_hashutil.py +++ b/swh/core/tests/test_hashutil.py @@ -1,77 +1,102 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import tempfile import unittest from nose.tools import istest from swh.core import hashutil class Hashlib(unittest.TestCase): def setUp(self): self.data = b'42\n' self.hex_checksums = { - 'sha1': '34973274ccef6ab4dfaaf86599792fa9c3fe4689', - 'sha1_git': 'd81cc0710eb6cf9efd5b920a8453e1e07157b6cd', - 'sha256': '084c799cd551dd1d8d5c5f9a5d593b2e931f5e36' - '122ee5c793c1d08a19839cc0', - } + 'sha1': '34973274ccef6ab4dfaaf86599792fa9c3fe4689', + 'sha1_git': 'd81cc0710eb6cf9efd5b920a8453e1e07157b6cd', + 'sha1_blob_git': 'd81cc0710eb6cf9efd5b920a8453e1e07157b6cd', + 'sha1_tree_git': 'a3b4138923e146bdf5b51bd6fd7c64e3f59dfcad', + 'sha1_commit_git': 'ebbb89b3165385c35a0dfa78ff9059ddd28d5126', + 'sha256': '084c799cd551dd1d8d5c5f9a5d593b2e931f5e36' + '122ee5c793c1d08a19839cc0', + } self.checksums = { 'sha1': bytes.fromhex('34973274ccef6ab4dfaaf865997' '92fa9c3fe4689'), 'sha1_git': bytes.fromhex('d81cc0710eb6cf9efd5b920a845' '3e1e07157b6cd'), + 'sha1_blob_git': bytes.fromhex('d81cc0710eb6cf9efd5b920a845' + '3e1e07157b6cd'), + 'sha1_tree_git': bytes.fromhex('a3b4138923e146bdf5b51bd6fd7' + 'c64e3f59dfcad'), + 'sha1_commit_git': bytes.fromhex('ebbb89b3165385c35a0dfa78ff9' + '059ddd28d5126'), 'sha256': bytes.fromhex('084c799cd551dd1d8d5c5f9a5d5' '93b2e931f5e36122ee5c793c1d0' '8a19839cc0'), - } + } @istest def hashdata(self): - checksums = hashutil.hashdata(self.data) + checksums = hashutil.hashdata(self.data, algorithms=hashutil.KNOWN_ALGORITHMS) self.assertEqual(checksums, self.checksums) @istest def unknown_algo(self): with self.assertRaises(ValueError): hashutil.hashdata(self.data, algorithms=['does-not-exist']) + for known_hash_algo in hashutil.KNOWN_ALGORITHMS: + self.assertIsNotNone(hashutil._new_hash(known_hash_algo, length=10)) + + @istest + def fail_without_length_on_sha1_git_but_ok_otherwise(self): + for hash_algo in ['sha1_git', 'sha1_blob_git', 'sha1_tree_git', 'sha1_commit_git']: + with self.assertRaises(ValueError): + hashutil._new_hash(hash_algo, length=None) + + for other_hash_algo in ['sha1', 'sha256']: + self.assertIsNotNone(hashutil._new_hash(other_hash_algo, length=None)) + @istest def algo_selection(self): checksums = hashutil.hashdata(self.data, algorithms=['sha1', 'sha256']) self.assertIn('sha1', checksums) self.assertIn('sha256', checksums) self.assertNotIn('sha1_git', checksums) @istest def hashfile_by_name(self): with tempfile.NamedTemporaryFile() as f: f.write(self.data) f.flush() - checksums = hashutil.hashfile(f.name) + checksums = hashutil.hashfile(f.name, + length=None, + algorithms=hashutil.KNOWN_ALGORITHMS) self.assertEqual(checksums, self.checksums) @istest def hashfile_by_obj(self): with tempfile.TemporaryFile() as f: f.write(self.data) f.seek(0) - checksums = hashutil.hashfile(f, len(self.data)) + checksums = hashutil.hashfile(f, + len(self.data), + algorithms=hashutil.KNOWN_ALGORITHMS) self.assertEqual(checksums, self.checksums) @istest def hex_to_hash(self): for algo in self.checksums: self.assertEqual(self.checksums[algo], hashutil.hex_to_hash(self.hex_checksums[algo])) @istest def hash_to_hex(self): for algo in self.checksums: self.assertEqual(self.hex_checksums[algo], hashutil.hash_to_hex(self.checksums[algo])) diff --git a/swh/core/worker.py b/swh/core/worker.py index fd30c52..2c81d23 100644 --- a/swh/core/worker.py +++ b/swh/core/worker.py @@ -1,98 +1,115 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging from celery import Celery -from celery.signals import after_setup_logger, after_setup_task_logger +from celery.signals import setup_logging from kombu import Exchange, Queue from swh.core.config import load_named_config from swh.core.logger import PostgresHandler CONFIG_NAME = 'worker.ini' DEFAULT_CONFIG = { 'task_broker': ('str', 'amqp://guest@localhost//'), 'task_modules': ('list[str]', []), 'task_queues': ('list[str]', []), 'task_soft_time_limit': ('int', 0), } -@after_setup_logger.connect -@after_setup_task_logger.connect -def setup_log_handler(sender=None, logger=None, loglevel=None, - logfile=None, format=None, - colorize=None, **kwds): - """Setup the postgresql log handler""" +@setup_logging.connect +def setup_log_handler(loglevel=None, logfile=None, format=None, + colorize=None): + """Setup logging according to Software Heritage preferences""" - handler = PostgresHandler(CONFIG['log_db']) - handler.setFormatter(logging.Formatter(format)) - handler.setLevel(logging.DEBUG) - logger.addHandler(handler) + root_logger = logging.getLogger('') + root_logger.setLevel(logging.INFO) + + console = logging.StreamHandler() + console.setLevel(logging.DEBUG) + + pg = PostgresHandler(CONFIG['log_db']) + pg.setFormatter(logging.Formatter(format)) + pg.setLevel(logging.DEBUG) + + root_logger.addHandler(console) + root_logger.addHandler(pg) + + celery_logger = logging.getLogger('celery') + celery_logger.setLevel(logging.INFO) + + urllib3_logger = logging.getLogger('urllib3') + urllib3_logger.setLevel(logging.CRITICAL) + + swh_logger = logging.getLogger('swh') + swh_logger.setLevel(logging.DEBUG) # Load the Celery config CONFIG = load_named_config(CONFIG_NAME, DEFAULT_CONFIG) # Celery Queues CELERY_QUEUES = [Queue('celery', Exchange('celery'), routing_key='celery')] for queue in CONFIG['task_queues']: CELERY_QUEUES.append(Queue(queue, Exchange(queue), routing_key=queue)) # Instantiate the Celery app app = Celery() app.conf.update( # The broker BROKER_URL=CONFIG['task_broker'], # Timezone configuration: all in UTC CELERY_ENABLE_UTC=True, CELERY_TIMEZONE='UTC', # Imported modules CELERY_IMPORTS=CONFIG['task_modules'], # Time (in seconds, or a timedelta object) for when after stored task # tombstones will be deleted. CELERY_TASK_RESULT_EXPIRES=3600, # Late ack means the task messages will be acknowledged after the task has # been executed, not just before, which is the default behavior. CELERY_ACKS_LATE=True, # A string identifying the default serialization method to use. # Can be pickle (default), json, yaml, msgpack or any custom serialization # methods that have been registered with kombu.serialization.registry CELERY_ACCEPT_CONTENT=['msgpack', 'pickle', 'json'], # If True the task will report its status as “started” # when the task is executed by a worker. CELERY_TRACK_STARTED=True, # Default compression used for task messages. Can be gzip, bzip2 # (if available), or any custom compression schemes registered # in the Kombu compression registry. # CELERY_MESSAGE_COMPRESSION='bzip2', # Disable all rate limits, even if tasks has explicit rate limits set. # (Disabling rate limits altogether is recommended if you don’t have any # tasks using them.) CELERY_DISABLE_RATE_LIMITS=True, # Task hard time limit in seconds. The worker processing the task will be # killed and replaced with a new one when this is exceeded. # CELERYD_TASK_TIME_LIMIT=3600, # Task soft time limit in seconds. # The SoftTimeLimitExceeded exception will be raised when this is exceeded. # The task can catch this to e.g. clean up before the hard time limit # comes. CELERYD_TASK_SOFT_TIME_LIMIT=CONFIG['task_soft_time_limit'], # Task routing CELERY_ROUTES={ 'swh.loader.git.tasks.LoadGitRepository': { 'queue': 'swh_loader_git', }, 'swh.loader.git.tasks.LoadGitHubRepository': { 'queue': 'swh_loader_git', }, 'swh.cloner.git.worker.tasks.execute_with_measure': { 'queue': 'swh_cloner_git', }, }, # Task queues this worker will consume from CELERY_QUEUES=CELERY_QUEUES, + # Allow pool restarts from remote + CELERYD_POOL_RESTARTS=True, ) diff --git a/version.txt b/version.txt index 855f8f7..763a1ad 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -v0.0.8-0-gf6ad5e2 \ No newline at end of file +v0.0.9-0-g5f371a8 \ No newline at end of file