diff --git a/requirements-swh.txt b/requirements-swh.txt index a11813c..99b8661 100644 --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,2 +1,3 @@ swh.core[db,http] >= 0.0.60 +swh-model >= 0.0.40 swh.storage >= 0.0.147 diff --git a/swh/journal/cli.py b/swh/journal/cli.py index d48a474..f9192e1 100644 --- a/swh/journal/cli.py +++ b/swh/journal/cli.py @@ -1,205 +1,232 @@ # Copyright (C) 2016-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import click import functools import logging +import mmap import os +import click + from swh.core import config from swh.core.cli import CONTEXT_SETTINGS +from swh.model.model import SHA1_SIZE from swh.storage import get_storage from swh.objstorage import get_objstorage from swh.journal.client import JournalClient +from swh.journal.replay import is_hash_in_bytearray from swh.journal.replay import process_replay_objects from swh.journal.replay import process_replay_objects_content from swh.journal.backfill import JournalBackfiller @click.group(name='journal', context_settings=CONTEXT_SETTINGS) @click.option('--config-file', '-C', default=None, type=click.Path(exists=True, dir_okay=False,), help="Configuration file.") @click.pass_context def cli(ctx, config_file): """Software Heritage Journal tools. The journal is a persistent logger of changes to the archive, with publish-subscribe support. """ if not config_file: config_file = os.environ.get('SWH_CONFIG_FILENAME') if config_file: if not os.path.exists(config_file): raise ValueError('%s does not exist' % config_file) conf = config.read(config_file) else: conf = {} ctx.ensure_object(dict) log_level = ctx.obj.get('log_level', logging.INFO) logging.root.setLevel(log_level) logging.getLogger('kafka').setLevel(logging.INFO) ctx.obj['config'] = conf def get_journal_client(ctx, **kwargs): conf = ctx.obj['config'].get('journal', {}) conf.update({k: v for (k, v) in kwargs.items() if v not in (None, ())}) if not conf.get('brokers'): ctx.fail('You must specify at least one kafka broker.') if not isinstance(conf['brokers'], (list, tuple)): conf['brokers'] = [conf['brokers']] return JournalClient(**conf) @cli.command() @click.option('--max-messages', '-m', default=None, type=int, help='Maximum number of objects to replay. Default is to ' 'run forever.') @click.option('--broker', 'brokers', type=str, multiple=True, help='Kafka broker to connect to. ' '(deprecated, use the config file instead)') @click.option('--prefix', type=str, default=None, help='Prefix of Kafka topic names to read from. ' '(deprecated, use the config file instead)') @click.option('--group-id', type=str, help='Name of the group id for reading from Kafka. ' '(deprecated, use the config file instead)') @click.pass_context def replay(ctx, brokers, prefix, group_id, max_messages): """Fill a Storage by reading a Journal. There can be several 'replayers' filling a Storage as long as they use the same `group-id`. """ logger = logging.getLogger(__name__) conf = ctx.obj['config'] try: storage = get_storage(**conf.pop('storage')) except KeyError: ctx.fail('You must have a storage configured in your config file.') client = get_journal_client( ctx, brokers=brokers, prefix=prefix, group_id=group_id) worker_fn = functools.partial(process_replay_objects, storage=storage) try: nb_messages = 0 while not max_messages or nb_messages < max_messages: nb_messages += client.process(worker_fn) logger.info('Processed %d messages.' % nb_messages) except KeyboardInterrupt: ctx.exit(0) else: print('Done.') @cli.command() @click.argument('object_type') @click.option('--start-object', default=None) @click.option('--end-object', default=None) @click.option('--dry-run', is_flag=True, default=False) @click.pass_context def backfiller(ctx, object_type, start_object, end_object, dry_run): """Run the backfiller The backfiller list objects from a Storage and produce journal entries from there. Typically used to rebuild a journal or compensate for missing objects in a journal (eg. due to a downtime of this later). The configuration file requires the following entries: - brokers: a list of kafka endpoints (the journal) in which entries will be added. - storage_dbconn: URL to connect to the storage DB. - prefix: the prefix of the topics (topics will be .). - client_id: the kafka client ID. """ conf = ctx.obj['config'] backfiller = JournalBackfiller(conf) try: backfiller.run( object_type=object_type, start_object=start_object, end_object=end_object, dry_run=dry_run) except KeyboardInterrupt: ctx.exit(0) @cli.command() @click.option('--max-messages', '-m', default=None, type=int, help='Maximum number of objects to replay. Default is to ' 'run forever.') @click.option('--concurrency', type=int, default=8, help='Concurrentcy level.') @click.option('--broker', 'brokers', type=str, multiple=True, help='Kafka broker to connect to.' '(deprecated, use the config file instead)') @click.option('--prefix', type=str, default=None, help='Prefix of Kafka topic names to read from.' '(deprecated, use the config file instead)') @click.option('--group-id', type=str, help='Name of the group id for reading from Kafka.' '(deprecated, use the config file instead)') +@click.option('--exclude-sha1-file', default=None, type=click.File('rb'), + help='File containing a sorted array of hashes to be excluded.') @click.pass_context -def content_replay(ctx, max_messages, concurrency, brokers, prefix, group_id): +def content_replay(ctx, max_messages, concurrency, + brokers, prefix, group_id, exclude_sha1_file): """Fill a destination Object Storage (typically a mirror) by reading a Journal and retrieving objects from an existing source ObjStorage. There can be several 'replayers' filling a given ObjStorage as long as they use the same `group-id`. This service retrieves object ids to copy from the 'content' topic. It will only copy object's content if the object's description in the kafka nmessage has the status:visible set. + + `--exclude-sha1-file` may be used to exclude some hashes to speed-up the + replay in case many of the contents are already in the destination + objstorage. It must contain a concatenation of all (sha1) hashes, + and it must be sorted. + This file will not be fully loaded into memory at any given time, + so it can be arbitrarily large. """ logger = logging.getLogger(__name__) conf = ctx.obj['config'] try: objstorage_src = get_objstorage(**conf.pop('objstorage_src')) except KeyError: ctx.fail('You must have a source objstorage configured in ' 'your config file.') try: objstorage_dst = get_objstorage(**conf.pop('objstorage_dst')) except KeyError: ctx.fail('You must have a destination objstorage configured ' 'in your config file.') + if exclude_sha1_file: + map_ = mmap.mmap(exclude_sha1_file.fileno(), 0, prot=mmap.PROT_READ) + if map_.size() % SHA1_SIZE != 0: + ctx.fail('--exclude-sha1 must link to a file whose size is an ' + 'exact multiple of %d bytes.' % SHA1_SIZE) + nb_excluded_hashes = int(map_.size()/SHA1_SIZE) + + def exclude_fn(obj): + return is_hash_in_bytearray(obj['sha1'], map_, nb_excluded_hashes) + else: + exclude_fn = None + client = get_journal_client( ctx, brokers=brokers, prefix=prefix, group_id=group_id, object_types=('content',)) worker_fn = functools.partial(process_replay_objects_content, src=objstorage_src, dst=objstorage_dst, - concurrency=concurrency) + concurrency=concurrency, + exclude_fn=exclude_fn) try: nb_messages = 0 while not max_messages or nb_messages < max_messages: nb_messages += client.process(worker_fn) logger.info('Processed %d messages.' % nb_messages) except KeyboardInterrupt: ctx.exit(0) else: print('Done.') def main(): logging.basicConfig() return cli(auto_envvar_prefix='SWH_JOURNAL') if __name__ == '__main__': main() diff --git a/swh/journal/replay.py b/swh/journal/replay.py index 357705e..d48bf6b 100644 --- a/swh/journal/replay.py +++ b/swh/journal/replay.py @@ -1,150 +1,201 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from time import time import logging from concurrent.futures import ThreadPoolExecutor -from swh.storage import HashCollision +from swh.core.statsd import statsd from swh.model.hashutil import hash_to_hex +from swh.model.model import SHA1_SIZE from swh.objstorage.objstorage import ID_HASH_ALGO -from swh.core.statsd import statsd +from swh.storage import HashCollision logger = logging.getLogger(__name__) -SHA1_SIZE = 20 - - def process_replay_objects(all_objects, *, storage): for (object_type, objects) in all_objects.items(): _insert_objects(object_type, objects, storage) def _insert_objects(object_type, objects, storage): if object_type == 'content': # TODO: insert 'content' in batches for object_ in objects: try: storage.content_add_metadata([object_]) except HashCollision as e: logger.error('Hash collision: %s', e.args) elif object_type in ('directory', 'revision', 'release', 'snapshot', 'origin'): # TODO: split batches that are too large for the storage # to handle? method = getattr(storage, object_type + '_add') method(objects) elif object_type == 'origin_visit': for visit in objects: if isinstance(visit['origin'], str): # old format; note that it will crash with the pg and # in-mem storages if the origin is not already known, # but there is no other choice because we can't add an # origin without knowing its type. Non-pg storages # don't use a numeric FK internally, visit['origin'] = {'url': visit['origin']} else: storage.origin_add_one(visit['origin']) if 'type' not in visit: # old format visit['type'] = visit['origin']['type'] storage.origin_visit_upsert(objects) else: logger.warning('Received a series of %s, this should not happen', object_type) def is_hash_in_bytearray(hash_, array, nb_hashes, hash_size=SHA1_SIZE): """ Checks if the given hash is in the provided `array`. The array must be a *sorted* list of sha1 hashes, and contain `nb_hashes` hashes (so its size must by `nb_hashes*hash_size` bytes). Args: hash_ (bytes): the hash to look for array (bytes): a sorted concatenated array of hashes (may be of any type supporting slice indexing, eg. :py:cls:`mmap.mmap`) nb_hashes (int): number of hashes in the array hash_size (int): size of a hash (defaults to 20, for SHA1) Example: >>> import os >>> hash1 = os.urandom(20) >>> hash2 = os.urandom(20) >>> hash3 = os.urandom(20) >>> array = b''.join(sorted([hash1, hash2])) >>> is_hash_in_bytearray(hash1, array, 2) True >>> is_hash_in_bytearray(hash2, array, 2) True >>> is_hash_in_bytearray(hash3, array, 2) False """ if len(hash_) != hash_size: raise ValueError('hash_ does not match the provided hash_size.') def get_hash(position): return array[position*hash_size:(position+1)*hash_size] # Regular dichotomy: left = 0 right = nb_hashes while left < right-1: middle = int((right+left)/2) pivot = get_hash(middle) if pivot == hash_: return True elif pivot < hash_: left = middle else: right = middle return get_hash(left) == hash_ def copy_object(obj_id, src, dst): statsd_name = 'swh_journal_content_replayer_%s_duration_seconds' try: with statsd.timed(statsd_name % 'get'): obj = src.get(obj_id) with statsd.timed(statsd_name % 'put'): dst.add(obj, obj_id=obj_id, check_presence=False) logger.debug('copied %s', hash_to_hex(obj_id)) statsd.increment( 'swh_journal_content_replayer_bytes_total', len(obj)) except Exception: obj = '' logger.exception('Failed to copy %s', hash_to_hex(obj_id)) return len(obj) -def process_replay_objects_content(all_objects, *, src, dst, concurrency=8): +def process_replay_objects_content(all_objects, *, src, dst, concurrency=8, + exclude_fn=None): + """ + Takes a list of records from Kafka (see + :py:func:`swh.journal.client.JournalClient.process`) and copies them + from the `src` objstorage to the `dst` objstorage, if: + + * `obj['status']` is `'visible'` + * `exclude_fn(obj)` is `False` (if `exclude_fn` is provided) + + Args: + all_objects Dict[str, List[dict]]: Objects passed by the Kafka client. + Most importantly, `all_objects['content'][*]['sha1']` is the + sha1 hash of each content + src: An object storage (see :py:func:`swh.objstorage.get_objstorage`) + dst: An object storage (see :py:func:`swh.objstorage.get_objstorage`) + exclude_fn Optional[Callable[dict, bool]]: Determines whether + an object should be copied. + + Example: + + >>> from swh.objstorage import get_objstorage + >>> src = get_objstorage('memory', {}) + >>> dst = get_objstorage('memory', {}) + >>> id1 = src.add(b'foo bar') + >>> id2 = src.add(b'baz qux') + >>> kafka_partitions = { + ... 'content': [ + ... { + ... 'sha1': id1, + ... 'status': 'visible', + ... }, + ... { + ... 'sha1': id2, + ... 'status': 'visible', + ... }, + ... ] + ... } + >>> process_replay_objects_content( + ... kafka_partitions, src=src, dst=dst, + ... exclude_fn=lambda obj: obj['sha1'] == id1) + >>> id1 in dst + False + >>> id2 in dst + True + """ vol = [] + nb_skipped = 0 t0 = time() with ThreadPoolExecutor(max_workers=concurrency) as executor: for (object_type, objects) in all_objects.items(): if object_type != 'content': logger.warning( 'Received a series of %s, this should not happen', object_type) continue for obj in objects: obj_id = obj[ID_HASH_ALGO] - if obj['status'] == 'visible': + if obj['status'] != 'visible': + nb_skipped += 1 + logger.debug('skipped %s (status=%s)', + hash_to_hex(obj_id), obj['status']) + elif exclude_fn and exclude_fn(obj): + nb_skipped += 1 + logger.debug('skipped %s (manually excluded)', + hash_to_hex(obj_id)) + else: fut = executor.submit(copy_object, obj_id, src, dst) fut.add_done_callback(lambda fn: vol.append(fn.result())) - else: - logger.debug('skipped %s (%s)', - hash_to_hex(obj_id), obj['status']) dt = time() - t0 logger.info( 'processed %s content objects in %.1fsec ' - '(%.1f obj/sec, %.1fMB/sec) - %s failures', + '(%.1f obj/sec, %.1fMB/sec) - %d failures - %d skipped', len(vol), dt, len(vol)/dt, - sum(vol)/1024/1024/dt, len([x for x in vol if not x])) + sum(vol)/1024/1024/dt, + len([x for x in vol if not x]), + nb_skipped) diff --git a/swh/journal/tests/test_cli.py b/swh/journal/tests/test_cli.py index 5aba92b..3e876ca 100644 --- a/swh/journal/tests/test_cli.py +++ b/swh/journal/tests/test_cli.py @@ -1,167 +1,205 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import functools import re import tempfile from subprocess import Popen from typing import Tuple from unittest.mock import patch from click.testing import CliRunner from kafka import KafkaProducer import pytest from swh.objstorage.backends.in_memory import InMemoryObjStorage from swh.storage.in_memory import Storage from swh.journal.cli import cli from swh.journal.serializers import key_to_kafka, value_to_kafka CLI_CONFIG = ''' storage: cls: memory args: {} objstorage_src: cls: mocked args: name: src objstorage_dst: cls: mocked args: name: dst ''' @pytest.fixture def storage(): """An instance of swh.storage.in_memory.Storage that gets injected into the CLI functions.""" storage = Storage() with patch('swh.journal.cli.get_storage') as get_storage_mock: get_storage_mock.return_value = storage yield storage def invoke(catch_exceptions, args): runner = CliRunner() with tempfile.NamedTemporaryFile('a', suffix='.yml') as config_fd: config_fd.write(CLI_CONFIG) config_fd.seek(0) args = ['-C' + config_fd.name] + args result = runner.invoke(cli, args) if not catch_exceptions and result.exception: print(result.output) raise result.exception return result def test_replay( storage: Storage, kafka_prefix: str, kafka_server: Tuple[Popen, int]): (_, port) = kafka_server kafka_prefix += '.swh.journal.objects' producer = KafkaProducer( bootstrap_servers='localhost:{}'.format(port), key_serializer=key_to_kafka, value_serializer=value_to_kafka, client_id='test-producer', ) snapshot = {'id': b'foo', 'branches': { b'HEAD': { 'target_type': 'revision', 'target': b'bar', } }} producer.send( topic=kafka_prefix+'.snapshot', key=snapshot['id'], value=snapshot) result = invoke(False, [ 'replay', '--broker', 'localhost:%d' % port, '--group-id', 'test-cli-consumer', '--prefix', kafka_prefix, '--max-messages', '1', ]) expected = r'Done.\n' assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output assert storage.snapshot_get(snapshot['id']) == { **snapshot, 'next_branch': None} def _patch_objstorages(names): objstorages = {name: InMemoryObjStorage() for name in names} def get_mock_objstorage(cls, args): assert cls == 'mocked', cls return objstorages[args['name']] def decorator(f): @functools.wraps(f) @patch('swh.journal.cli.get_objstorage') def newf(get_objstorage_mock, *args, **kwargs): get_objstorage_mock.side_effect = get_mock_objstorage f(*args, objstorages=objstorages, **kwargs) return newf return decorator def _fill_objstorage_and_kafka(kafka_port, kafka_prefix, objstorages): producer = KafkaProducer( bootstrap_servers='localhost:{}'.format(kafka_port), key_serializer=key_to_kafka, value_serializer=value_to_kafka, client_id='test-producer', ) contents = {} for i in range(10): content = b'\x00'*19 + bytes([i]) sha1 = objstorages['src'].add(content) contents[sha1] = content producer.send(topic=kafka_prefix+'.content', key=sha1, value={ 'sha1': sha1, 'status': 'visible', }) producer.flush() return contents @_patch_objstorages(['src', 'dst']) def test_replay_content( objstorages, storage: Storage, kafka_prefix: str, kafka_server: Tuple[Popen, int]): (_, kafka_port) = kafka_server kafka_prefix += '.swh.journal.objects' contents = _fill_objstorage_and_kafka( kafka_port, kafka_prefix, objstorages) result = invoke(False, [ 'content-replay', '--broker', 'localhost:%d' % kafka_port, '--group-id', 'test-cli-consumer', '--prefix', kafka_prefix, '--max-messages', '10', ]) expected = r'Done.\n' assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output for (sha1, content) in contents.items(): assert sha1 in objstorages['dst'], sha1 assert objstorages['dst'].get(sha1) == content + + +@_patch_objstorages(['src', 'dst']) +def test_replay_content_exclude( + objstorages, + storage: Storage, + kafka_prefix: str, + kafka_server: Tuple[Popen, int]): + (_, kafka_port) = kafka_server + kafka_prefix += '.swh.journal.objects' + + contents = _fill_objstorage_and_kafka( + kafka_port, kafka_prefix, objstorages) + + excluded_contents = list(contents)[0::2] # picking half of them + with tempfile.NamedTemporaryFile(mode='w+b') as fd: + fd.write(b''.join(sorted(excluded_contents))) + + fd.seek(0) + + result = invoke(False, [ + 'content-replay', + '--broker', 'localhost:%d' % kafka_port, + '--group-id', 'test-cli-consumer', + '--prefix', kafka_prefix, + '--max-messages', '10', + '--exclude-sha1-file', fd.name, + ]) + expected = r'Done.\n' + assert result.exit_code == 0, result.output + assert re.fullmatch(expected, result.output, re.MULTILINE), result.output + + for (sha1, content) in contents.items(): + if sha1 in excluded_contents: + assert sha1 not in objstorages['dst'], sha1 + else: + assert sha1 in objstorages['dst'], sha1 + assert objstorages['dst'].get(sha1) == content