diff --git a/swh/journal/replay.py b/swh/journal/replay.py index bc44939..357705e 100644 --- a/swh/journal/replay.py +++ b/swh/journal/replay.py @@ -1,99 +1,150 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from time import time import logging from concurrent.futures import ThreadPoolExecutor from swh.storage import HashCollision from swh.model.hashutil import hash_to_hex from swh.objstorage.objstorage import ID_HASH_ALGO from swh.core.statsd import statsd logger = logging.getLogger(__name__) +SHA1_SIZE = 20 + + def process_replay_objects(all_objects, *, storage): for (object_type, objects) in all_objects.items(): _insert_objects(object_type, objects, storage) def _insert_objects(object_type, objects, storage): if object_type == 'content': # TODO: insert 'content' in batches for object_ in objects: try: storage.content_add_metadata([object_]) except HashCollision as e: logger.error('Hash collision: %s', e.args) elif object_type in ('directory', 'revision', 'release', 'snapshot', 'origin'): # TODO: split batches that are too large for the storage # to handle? method = getattr(storage, object_type + '_add') method(objects) elif object_type == 'origin_visit': for visit in objects: if isinstance(visit['origin'], str): # old format; note that it will crash with the pg and # in-mem storages if the origin is not already known, # but there is no other choice because we can't add an # origin without knowing its type. Non-pg storages # don't use a numeric FK internally, visit['origin'] = {'url': visit['origin']} else: storage.origin_add_one(visit['origin']) if 'type' not in visit: # old format visit['type'] = visit['origin']['type'] storage.origin_visit_upsert(objects) else: logger.warning('Received a series of %s, this should not happen', object_type) +def is_hash_in_bytearray(hash_, array, nb_hashes, hash_size=SHA1_SIZE): + """ + Checks if the given hash is in the provided `array`. The array must be + a *sorted* list of sha1 hashes, and contain `nb_hashes` hashes + (so its size must by `nb_hashes*hash_size` bytes). + + Args: + hash_ (bytes): the hash to look for + array (bytes): a sorted concatenated array of hashes (may be of + any type supporting slice indexing, eg. :py:cls:`mmap.mmap`) + nb_hashes (int): number of hashes in the array + hash_size (int): size of a hash (defaults to 20, for SHA1) + + Example: + + >>> import os + >>> hash1 = os.urandom(20) + >>> hash2 = os.urandom(20) + >>> hash3 = os.urandom(20) + >>> array = b''.join(sorted([hash1, hash2])) + >>> is_hash_in_bytearray(hash1, array, 2) + True + >>> is_hash_in_bytearray(hash2, array, 2) + True + >>> is_hash_in_bytearray(hash3, array, 2) + False + """ + if len(hash_) != hash_size: + raise ValueError('hash_ does not match the provided hash_size.') + + def get_hash(position): + return array[position*hash_size:(position+1)*hash_size] + + # Regular dichotomy: + left = 0 + right = nb_hashes + while left < right-1: + middle = int((right+left)/2) + pivot = get_hash(middle) + if pivot == hash_: + return True + elif pivot < hash_: + left = middle + else: + right = middle + return get_hash(left) == hash_ + + def copy_object(obj_id, src, dst): statsd_name = 'swh_journal_content_replayer_%s_duration_seconds' try: with statsd.timed(statsd_name % 'get'): obj = src.get(obj_id) with statsd.timed(statsd_name % 'put'): dst.add(obj, obj_id=obj_id, check_presence=False) logger.debug('copied %s', hash_to_hex(obj_id)) statsd.increment( 'swh_journal_content_replayer_bytes_total', len(obj)) except Exception: obj = '' logger.exception('Failed to copy %s', hash_to_hex(obj_id)) return len(obj) def process_replay_objects_content(all_objects, *, src, dst, concurrency=8): vol = [] t0 = time() with ThreadPoolExecutor(max_workers=concurrency) as executor: for (object_type, objects) in all_objects.items(): if object_type != 'content': logger.warning( 'Received a series of %s, this should not happen', object_type) continue for obj in objects: obj_id = obj[ID_HASH_ALGO] if obj['status'] == 'visible': fut = executor.submit(copy_object, obj_id, src, dst) fut.add_done_callback(lambda fn: vol.append(fn.result())) else: logger.debug('skipped %s (%s)', hash_to_hex(obj_id), obj['status']) dt = time() - t0 logger.info( 'processed %s content objects in %.1fsec ' '(%.1f obj/sec, %.1fMB/sec) - %s failures', len(vol), dt, len(vol)/dt, sum(vol)/1024/1024/dt, len([x for x in vol if not x])) diff --git a/swh/journal/tests/test_replay.py b/swh/journal/tests/test_replay.py index 12cb277..4bd9977 100644 --- a/swh/journal/tests/test_replay.py +++ b/swh/journal/tests/test_replay.py @@ -1,201 +1,216 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import functools import random from subprocess import Popen from typing import Tuple import dateutil from kafka import KafkaProducer +from hypothesis import strategies, given, settings from swh.storage import get_storage from swh.storage.in_memory import ENABLE_ORIGIN_IDS from swh.journal.client import JournalClient from swh.journal.serializers import key_to_kafka, value_to_kafka -from swh.journal.replay import process_replay_objects +from swh.journal.replay import process_replay_objects, is_hash_in_bytearray from .conftest import OBJECT_TYPE_KEYS from .utils import MockedJournalClient, MockedKafkaWriter def test_storage_play( kafka_prefix: str, kafka_server: Tuple[Popen, int]): (_, port) = kafka_server kafka_prefix += '.swh.journal.objects' storage = get_storage('memory', {}) producer = KafkaProducer( bootstrap_servers='localhost:{}'.format(port), key_serializer=key_to_kafka, value_serializer=value_to_kafka, client_id='test producer', ) now = datetime.datetime.now(tz=datetime.timezone.utc) # Fill Kafka nb_sent = 0 nb_visits = 0 for (object_type, (_, objects)) in OBJECT_TYPE_KEYS.items(): topic = kafka_prefix + '.' + object_type for object_ in objects: key = bytes(random.randint(0, 255) for _ in range(40)) object_ = object_.copy() if object_type == 'content': object_['ctime'] = now elif object_type == 'origin_visit': nb_visits += 1 object_['visit'] = nb_visits producer.send(topic, key=key, value=object_) nb_sent += 1 # Fill the storage from Kafka config = { 'brokers': 'localhost:%d' % kafka_server[1], 'group_id': 'replayer', 'prefix': kafka_prefix, 'max_messages': nb_sent, } replayer = JournalClient(**config) worker_fn = functools.partial(process_replay_objects, storage=storage) nb_inserted = 0 while nb_inserted < nb_sent: nb_inserted += replayer.process(worker_fn) assert nb_sent == nb_inserted # Check the objects were actually inserted in the storage assert OBJECT_TYPE_KEYS['revision'][1] == \ list(storage.revision_get( [rev['id'] for rev in OBJECT_TYPE_KEYS['revision'][1]])) assert OBJECT_TYPE_KEYS['release'][1] == \ list(storage.release_get( [rel['id'] for rel in OBJECT_TYPE_KEYS['release'][1]])) origins = list(storage.origin_get( [orig for orig in OBJECT_TYPE_KEYS['origin'][1]])) assert OBJECT_TYPE_KEYS['origin'][1] == \ [{'url': orig['url'], 'type': orig['type']} for orig in origins] for origin in origins: origin_id_or_url = \ origin['id'] if ENABLE_ORIGIN_IDS else origin['url'] expected_visits = [ { **visit, 'origin': origin_id_or_url, 'date': dateutil.parser.parse(visit['date']), } for visit in OBJECT_TYPE_KEYS['origin_visit'][1] if visit['origin']['url'] == origin['url'] and visit['origin']['type'] == origin['type'] ] actual_visits = list(storage.origin_visit_get( origin_id_or_url)) for visit in actual_visits: del visit['visit'] # opaque identifier assert expected_visits == actual_visits contents = list(storage.content_get_metadata( [cont['sha1'] for cont in OBJECT_TYPE_KEYS['content'][1]])) assert None not in contents assert contents == OBJECT_TYPE_KEYS['content'][1] def test_write_replay_legacy_origin_visit1(): """Test origin_visit when the 'origin' is just a string.""" queue = [] replayer = MockedJournalClient(queue) writer = MockedKafkaWriter(queue) # Note that flipping the order of these two insertions will crash # the test, because the legacy origin_format does not allow to create # the origin when needed (type is missing) now = datetime.datetime.now() writer.send('origin', 'foo', { 'url': 'http://example.com/', 'type': 'git', }) writer.send('origin_visit', 'foo', { 'visit': 1, 'origin': 'http://example.com/', 'date': now, }) queue_size = sum(len(partition) for batch in queue for partition in batch.values()) storage = get_storage('memory', {}) worker_fn = functools.partial(process_replay_objects, storage=storage) nb_messages = 0 while nb_messages < queue_size: nb_messages += replayer.process(worker_fn) visits = list(storage.origin_visit_get('http://example.com/')) if ENABLE_ORIGIN_IDS: assert visits == [{ 'visit': 1, 'origin': 1, 'date': now, }] else: assert visits == [{ 'visit': 1, 'origin': 'http://example.com/', 'date': now, }] def test_write_replay_legacy_origin_visit2(): """Test origin_visit when 'type' is missing.""" queue = [] replayer = MockedJournalClient(queue) writer = MockedKafkaWriter(queue) now = datetime.datetime.now() writer.send('origin', 'foo', { 'url': 'http://example.com/', 'type': 'git', }) writer.send('origin_visit', 'foo', { 'visit': 1, 'origin': { 'url': 'http://example.com/', 'type': 'git', }, 'date': now, }) queue_size = sum(len(partition) for batch in queue for partition in batch.values()) storage = get_storage('memory', {}) worker_fn = functools.partial(process_replay_objects, storage=storage) nb_messages = 0 while nb_messages < queue_size: nb_messages += replayer.process(worker_fn) visits = list(storage.origin_visit_get('http://example.com/')) if ENABLE_ORIGIN_IDS: assert visits == [{ 'visit': 1, 'origin': 1, 'date': now, 'type': 'git', }] else: assert visits == [{ 'visit': 1, 'origin': 'http://example.com/', 'date': now, 'type': 'git', }] + + +hash_strategy = strategies.binary(min_size=20, max_size=20) + + +@settings(max_examples=500) +@given(strategies.sets(hash_strategy, min_size=0, max_size=500), + strategies.sets(hash_strategy, min_size=10)) +def test_is_hash_in_bytearray(haystack, needles): + array = b''.join(sorted(haystack)) + needles |= haystack # Exhaustively test for all objects in the array + for needle in needles: + assert is_hash_in_bytearray(needle, array, len(haystack)) == \ + (needle in haystack) diff --git a/tox.ini b/tox.ini index 8033bbb..08ef39d 100644 --- a/tox.ini +++ b/tox.ini @@ -1,27 +1,27 @@ [tox] envlist=flake8,py3-no-origin-ids,py3 [testenv:py3] passenv=SWH_KAFKA_ROOT deps = .[testing] pytest-cov commands = - pytest --cov=swh --cov-branch {posargs} + pytest --cov=swh --cov-branch --doctest-modules {posargs} [testenv:py3-no-origin-ids] passenv=SWH_KAFKA_ROOT deps = .[testing] pytest-cov setenv = SWH_STORAGE_IN_MEMORY_ENABLE_ORIGIN_IDS=false commands = - pytest --cov=swh --cov-branch {posargs} + pytest --cov=swh --cov-branch --doctest-modules {posargs} [testenv:flake8] skip_install = true deps = flake8 commands = {envpython} -m flake8