diff --git a/swh/journal/replay.py b/swh/journal/replay.py index b1a3f61..0bfb964 100644 --- a/swh/journal/replay.py +++ b/swh/journal/replay.py @@ -1,394 +1,392 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import copy from time import time import logging from contextlib import contextmanager from swh.core.statsd import statsd from swh.model.identifiers import normalize_timestamp from swh.model.hashutil import hash_to_hex from swh.model.model import SHA1_SIZE from swh.objstorage.objstorage import ID_HASH_ALGO from swh.storage import HashCollision logger = logging.getLogger(__name__) def process_replay_objects(all_objects, *, storage): for (object_type, objects) in all_objects.items(): logger.debug("Inserting %s %s objects", len(objects), object_type) _insert_objects(object_type, objects, storage) def _fix_revision_pypi_empty_string(rev): """PyPI loader failed to encode empty strings as bytes, see: swh:1:rev:8f0095ee0664867055d03de9bcc8f95b91d8a2b9 or https://forge.softwareheritage.org/D1772 """ rev = { **rev, 'author': rev['author'].copy(), 'committer': rev['committer'].copy(), } if rev['author'].get('email') == '': rev['author']['email'] = b'' if rev['author'].get('name') == '': rev['author']['name'] = b'' if rev['committer'].get('email') == '': rev['committer']['email'] = b'' if rev['committer'].get('name') == '': rev['committer']['name'] = b'' return rev def _fix_revision_transplant_source(rev): if rev.get('metadata') and rev['metadata'].get('extra_headers'): rev = copy.deepcopy(rev) rev['metadata']['extra_headers'] = [ [key, value.encode('ascii')] if key == 'transplant_source' and isinstance(value, str) else [key, value] for (key, value) in rev['metadata']['extra_headers']] return rev def _check_date(date): """Returns whether the date can be represented in backends with sane limits on timestamps and timezeones (resp. signed 64-bits and signed 16 bits), and that microseconds is valid (ie. between 0 and 10^6). """ date = normalize_timestamp(date) return (-2**63 <= date['timestamp']['seconds'] < 2**63) \ and (0 <= date['timestamp']['microseconds'] < 10**6) \ and (-2**15 <= date['offset'] < 2**15) def _check_revision_date(rev): """Exclude revisions with invalid dates. See https://forge.softwareheritage.org/T1339""" return _check_date(rev['date']) and _check_date(rev['committer_date']) def _fix_revisions(revisions): good_revisions = [] for rev in revisions: rev = _fix_revision_pypi_empty_string(rev) rev = _fix_revision_transplant_source(rev) if not _check_revision_date(rev): logging.warning('Excluding revision (invalid date): %r', rev) continue good_revisions.append(rev) return good_revisions def _fix_origin_visits(visits): good_visits = [] for visit in visits: visit = visit.copy() if isinstance(visit['origin'], str): # note that it will crash with the pg and # in-mem storages if the origin is not already known, # but there is no other choice because we can't add an # origin without knowing its type. Non-pg storages # don't use a numeric FK internally, visit['origin'] = {'url': visit['origin']} else: if 'type' not in visit: visit['type'] = visit['origin']['type'] good_visits.append(visit) return good_visits def fix_objects(object_type, objects): """Converts a possibly old object from the journal to its current expected format. List of conversions: Empty author name/email in PyPI releases: >>> from pprint import pprint >>> date = { ... 'timestamp': { ... 'seconds': 1565096932, ... 'microseconds': 0, ... }, ... 'offset': 0, ... } >>> pprint(fix_objects('revision', [{ ... 'author': {'email': '', 'fullname': b'', 'name': ''}, ... 'committer': {'email': '', 'fullname': b'', 'name': ''}, ... 'date': date, ... 'committer_date': date, ... }])) [{'author': {'email': b'', 'fullname': b'', 'name': b''}, 'committer': {'email': b'', 'fullname': b'', 'name': b''}, 'committer_date': {'offset': 0, 'timestamp': {'microseconds': 0, 'seconds': 1565096932}}, 'date': {'offset': 0, 'timestamp': {'microseconds': 0, 'seconds': 1565096932}}}] Fix type of 'transplant_source' extra headers: >>> revs = fix_objects('revision', [{ ... 'author': {'email': '', 'fullname': b'', 'name': ''}, ... 'committer': {'email': '', 'fullname': b'', 'name': ''}, ... 'date': date, ... 'committer_date': date, ... 'metadata': { ... 'extra_headers': [ ... ['time_offset_seconds', b'-3600'], ... ['transplant_source', '29c154a012a70f49df983625090434587622b39e'] ... ]} ... }]) >>> pprint(revs[0]['metadata']['extra_headers']) [['time_offset_seconds', b'-3600'], ['transplant_source', b'29c154a012a70f49df983625090434587622b39e']] Filter out revisions with invalid dates: >>> from copy import deepcopy >>> invalid_date1 = deepcopy(date) >>> invalid_date1['timestamp']['microseconds'] = 1000000000 # > 10^6 >>> fix_objects('revision', [{ ... 'author': {'email': '', 'fullname': b'', 'name': b''}, ... 'committer': {'email': '', 'fullname': b'', 'name': b''}, ... 'date': invalid_date1, ... 'committer_date': date, ... }]) [] >>> invalid_date2 = deepcopy(date) >>> invalid_date2['timestamp']['seconds'] = 2**70 # > 10^63 >>> fix_objects('revision', [{ ... 'author': {'email': '', 'fullname': b'', 'name': b''}, ... 'committer': {'email': '', 'fullname': b'', 'name': b''}, ... 'date': invalid_date2, ... 'committer_date': date, ... }]) [] >>> invalid_date3 = deepcopy(date) >>> invalid_date3['offset'] = 2**20 # > 10^15 >>> fix_objects('revision', [{ ... 'author': {'email': '', 'fullname': b'', 'name': b''}, ... 'committer': {'email': '', 'fullname': b'', 'name': b''}, ... 'date': date, ... 'committer_date': invalid_date3, ... }]) [] `visit['origin']` is an URL instead of a dict: >>> fix_objects('origin_visit', [{'origin': 'http://foo'}]) [{'origin': {'url': 'http://foo'}}] `visit['type']` is missing , but `origin['visit']['type']` exists: >>> pprint(fix_objects( ... 'origin_visit', ... [{'origin': {'type': 'hg', 'url': 'http://foo'}}])) [{'origin': {'type': 'hg', 'url': 'http://foo'}, 'type': 'hg'}] """ # noqa if object_type == 'revision': objects = _fix_revisions(objects) elif object_type == 'origin_visit': objects = _fix_origin_visits(objects) return objects def _insert_objects(object_type, objects, storage): objects = fix_objects(object_type, objects) - if object_type == 'content': # TODO: insert 'content' in batches for object_ in objects: try: storage.content_add_metadata([object_]) except HashCollision as e: logger.error('Hash collision: %s', e.args) elif object_type in ('directory', 'revision', 'release', 'snapshot', 'origin'): # TODO: split batches that are too large for the storage # to handle? method = getattr(storage, object_type + '_add') method(objects) elif object_type == 'origin_visit': for visit in objects: - if 'type' in visit['origin']: - storage.origin_add_one(visit['origin']) + storage.origin_add_one(visit['origin']) if 'metadata' not in visit: visit['metadata'] = None storage.origin_visit_upsert(objects) else: logger.warning('Received a series of %s, this should not happen', object_type) def is_hash_in_bytearray(hash_, array, nb_hashes, hash_size=SHA1_SIZE): """ Checks if the given hash is in the provided `array`. The array must be a *sorted* list of sha1 hashes, and contain `nb_hashes` hashes (so its size must by `nb_hashes*hash_size` bytes). Args: hash_ (bytes): the hash to look for array (bytes): a sorted concatenated array of hashes (may be of any type supporting slice indexing, eg. :py:cls:`mmap.mmap`) nb_hashes (int): number of hashes in the array hash_size (int): size of a hash (defaults to 20, for SHA1) Example: >>> import os >>> hash1 = os.urandom(20) >>> hash2 = os.urandom(20) >>> hash3 = os.urandom(20) >>> array = b''.join(sorted([hash1, hash2])) >>> is_hash_in_bytearray(hash1, array, 2) True >>> is_hash_in_bytearray(hash2, array, 2) True >>> is_hash_in_bytearray(hash3, array, 2) False """ if len(hash_) != hash_size: raise ValueError('hash_ does not match the provided hash_size.') def get_hash(position): return array[position*hash_size:(position+1)*hash_size] # Regular dichotomy: left = 0 right = nb_hashes while left < right-1: middle = int((right+left)/2) pivot = get_hash(middle) if pivot == hash_: return True elif pivot < hash_: left = middle else: right = middle return get_hash(left) == hash_ @contextmanager def retry(max_retries): lasterror = None for i in range(max_retries): try: yield break except Exception as exc: lasterror = exc else: raise lasterror def copy_object(obj_id, src, dst, max_retries=3): statsd_name = 'swh_journal_content_replayer_%s_duration_seconds' try: with statsd.timed(statsd_name % 'get'): with retry(max_retries): obj = src.get(obj_id) logger.debug('retrieved %s', hash_to_hex(obj_id)) with statsd.timed(statsd_name % 'put'): with retry(max_retries): dst.add(obj, obj_id=obj_id, check_presence=False) logger.debug('copied %s', hash_to_hex(obj_id)) statsd.increment( 'swh_journal_content_replayer_bytes_total', len(obj)) except Exception: obj = '' logger.error('Failed to copy %s', hash_to_hex(obj_id)) raise return len(obj) def process_replay_objects_content(all_objects, *, src, dst, exclude_fn=None): """ Takes a list of records from Kafka (see :py:func:`swh.journal.client.JournalClient.process`) and copies them from the `src` objstorage to the `dst` objstorage, if: * `obj['status']` is `'visible'` * `exclude_fn(obj)` is `False` (if `exclude_fn` is provided) Args: all_objects Dict[str, List[dict]]: Objects passed by the Kafka client. Most importantly, `all_objects['content'][*]['sha1']` is the sha1 hash of each content src: An object storage (see :py:func:`swh.objstorage.get_objstorage`) dst: An object storage (see :py:func:`swh.objstorage.get_objstorage`) exclude_fn Optional[Callable[dict, bool]]: Determines whether an object should be copied. Example: >>> from swh.objstorage import get_objstorage >>> src = get_objstorage('memory', {}) >>> dst = get_objstorage('memory', {}) >>> id1 = src.add(b'foo bar') >>> id2 = src.add(b'baz qux') >>> kafka_partitions = { ... 'content': [ ... { ... 'sha1': id1, ... 'status': 'visible', ... }, ... { ... 'sha1': id2, ... 'status': 'visible', ... }, ... ] ... } >>> process_replay_objects_content( ... kafka_partitions, src=src, dst=dst, ... exclude_fn=lambda obj: obj['sha1'] == id1) >>> id1 in dst False >>> id2 in dst True """ vol = [] nb_skipped = 0 t0 = time() for (object_type, objects) in all_objects.items(): if object_type != 'content': logger.warning( 'Received a series of %s, this should not happen', object_type) continue for obj in objects: obj_id = obj[ID_HASH_ALGO] if obj['status'] != 'visible': nb_skipped += 1 logger.debug('skipped %s (status=%s)', hash_to_hex(obj_id), obj['status']) elif exclude_fn and exclude_fn(obj): nb_skipped += 1 logger.debug('skipped %s (manually excluded)', hash_to_hex(obj_id)) else: vol.append(copy_object(obj_id, src, dst)) dt = time() - t0 logger.info( 'processed %s content objects in %.1fsec ' '(%.1f obj/sec, %.1fMB/sec) - %d failures - %d skipped', len(vol), dt, len(vol)/dt, sum(vol)/1024/1024/dt, len([x for x in vol if not x]), nb_skipped) diff --git a/swh/journal/tests/conftest.py b/swh/journal/tests/conftest.py index 898dea5..598924c 100644 --- a/swh/journal/tests/conftest.py +++ b/swh/journal/tests/conftest.py @@ -1,237 +1,235 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import pytest import logging import random import string from confluent_kafka import Consumer from subprocess import Popen from typing import Any, Dict, List, Optional, Tuple from pathlib import Path from pytest_kafka import ( make_zookeeper_process, make_kafka_server ) from swh.model.hashutil import hash_to_bytes logger = logging.getLogger(__name__) CONTENTS = [ { 'length': 3, 'sha1': hash_to_bytes( '34973274ccef6ab4dfaaf86599792fa9c3fe4689'), 'sha1_git': b'foo', 'blake2s256': b'bar', 'sha256': b'baz', 'status': 'visible', }, ] COMMITTERS = [ { 'fullname': b'foo', 'name': b'foo', 'email': b'', }, { 'fullname': b'bar', 'name': b'bar', 'email': b'', } ] DATES = [ { 'timestamp': { 'seconds': 1234567891, 'microseconds': 0, }, 'offset': 120, 'negative_utc': None, }, { 'timestamp': { 'seconds': 1234567892, 'microseconds': 0, }, 'offset': 120, 'negative_utc': None, } ] REVISIONS = [ { 'id': hash_to_bytes('7026b7c1a2af56521e951c01ed20f255fa054238'), 'message': b'hello', 'date': DATES[0], 'committer': COMMITTERS[0], 'author': COMMITTERS[0], 'committer_date': DATES[0], 'type': 'git', 'directory': '\x01'*20, 'synthetic': False, 'metadata': None, 'parents': [], }, { 'id': hash_to_bytes('368a48fe15b7db2383775f97c6b247011b3f14f4'), 'message': b'hello again', 'date': DATES[1], 'committer': COMMITTERS[1], 'author': COMMITTERS[1], 'committer_date': DATES[1], 'type': 'hg', 'directory': '\x02'*20, 'synthetic': False, 'metadata': None, 'parents': [], }, ] RELEASES = [ { 'id': hash_to_bytes('d81cc0710eb6cf9efd5b920a8453e1e07157b6cd'), 'name': b'v0.0.1', 'date': { 'timestamp': { 'seconds': 1234567890, 'microseconds': 0, }, 'offset': 120, 'negative_utc': None, }, 'author': COMMITTERS[0], 'target_type': 'revision', 'target': b'\x04'*20, 'message': b'foo', 'synthetic': False, }, ] ORIGINS = [ { 'url': 'https://somewhere.org/den/fox', - 'type': 'git', }, { 'url': 'https://overtherainbow.org/fox/den', - 'type': 'svn', } ] ORIGIN_VISITS = [ { 'origin': ORIGINS[0], 'date': '2013-05-07 04:20:39.369271+00:00', 'snapshot': None, # TODO 'status': 'ongoing', # TODO 'metadata': {'foo': 'bar'}, 'type': 'git', }, { 'origin': ORIGINS[0], 'date': '2018-11-27 17:20:39+00:00', 'snapshot': None, # TODO 'status': 'ongoing', # TODO 'metadata': {'baz': 'qux'}, 'type': 'git', } ] # From type to tuple (id, ) OBJECT_TYPE_KEYS = { 'content': ('sha1', CONTENTS), 'revision': ('id', REVISIONS), 'release': ('id', RELEASES), 'origin': (None, ORIGINS), 'origin_visit': (None, ORIGIN_VISITS), } # type: Dict[str, Tuple[Optional[str], List[Dict[str, Any]]]] KAFKA_ROOT = os.environ.get('SWH_KAFKA_ROOT') KAFKA_ROOT = KAFKA_ROOT if KAFKA_ROOT else os.path.dirname(__file__) + '/kafka' if not os.path.exists(KAFKA_ROOT): msg = ('Development error: %s must exist and target an ' 'existing kafka installation' % KAFKA_ROOT) raise ValueError(msg) KAFKA_SCRIPTS = Path(KAFKA_ROOT) / 'bin' KAFKA_BIN = str(KAFKA_SCRIPTS / 'kafka-server-start.sh') ZOOKEEPER_BIN = str(KAFKA_SCRIPTS / 'zookeeper-server-start.sh') # Those defines fixtures zookeeper_proc = make_zookeeper_process(ZOOKEEPER_BIN, scope='session') os.environ['KAFKA_LOG4J_OPTS'] = \ '-Dlog4j.configuration=file:%s/log4j.properties' % \ os.path.dirname(__file__) kafka_server = make_kafka_server(KAFKA_BIN, 'zookeeper_proc', scope='session') kafka_logger = logging.getLogger('kafka') kafka_logger.setLevel(logging.WARN) @pytest.fixture(scope='function') def kafka_prefix(): return ''.join(random.choice(string.ascii_lowercase) for _ in range(10)) TEST_CONFIG = { 'consumer_id': 'swh.journal.consumer', 'object_types': OBJECT_TYPE_KEYS.keys(), 'max_messages': 1, # will read 1 message and stops 'storage': {'cls': 'memory', 'args': {}}, } @pytest.fixture def test_config(kafka_server: Tuple[Popen, int], kafka_prefix: str): """Test configuration needed for producer/consumer """ _, port = kafka_server return { **TEST_CONFIG, 'brokers': ['127.0.0.1:{}'.format(port)], 'prefix': kafka_prefix + '.swh.journal.objects', } @pytest.fixture def consumer( kafka_server: Tuple[Popen, int], test_config: Dict, kafka_prefix: str, ) -> Consumer: """Get a connected Kafka consumer. """ _, kafka_port = kafka_server consumer = Consumer({ 'bootstrap.servers': '127.0.0.1:{}'.format(kafka_port), 'auto.offset.reset': 'earliest', 'enable.auto.commit': True, 'group.id': "test-consumer-%s" % kafka_prefix, }) kafka_topics = [ '%s.%s' % (test_config['prefix'], object_type) for object_type in test_config['object_types'] ] consumer.subscribe(kafka_topics) yield consumer consumer.close() diff --git a/swh/journal/tests/test_kafka_writer.py b/swh/journal/tests/test_kafka_writer.py index 2f36c6b..99f0265 100644 --- a/swh/journal/tests/test_kafka_writer.py +++ b/swh/journal/tests/test_kafka_writer.py @@ -1,136 +1,136 @@ # Copyright (C) 2018-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import defaultdict import datetime from confluent_kafka import Consumer, KafkaException from subprocess import Popen from typing import Tuple from swh.storage import get_storage from swh.journal.writer.kafka import KafkaJournalWriter from swh.journal.serializers import ( kafka_to_key, kafka_to_value ) from .conftest import OBJECT_TYPE_KEYS def assert_written(consumer, kafka_prefix, expected_messages): consumed_objects = defaultdict(list) fetched_messages = 0 retries_left = 1000 while fetched_messages < expected_messages: if retries_left == 0: raise ValueError('Timed out fetching messages from kafka') msg = consumer.poll(timeout=0.01) if not msg: retries_left -= 1 continue error = msg.error() if error is not None: if error.fatal(): raise KafkaException(error) retries_left -= 1 continue fetched_messages += 1 consumed_objects[msg.topic()].append( (kafka_to_key(msg.key()), kafka_to_value(msg.value())) ) for (object_type, (key_name, objects)) in OBJECT_TYPE_KEYS.items(): topic = kafka_prefix + '.' + object_type (keys, values) = zip(*consumed_objects[topic]) if key_name: assert list(keys) == [object_[key_name] for object_ in objects] else: pass # TODO if object_type == 'origin_visit': for value in values: del value['visit'] elif object_type == 'content': for value in values: del value['ctime'] for object_ in objects: assert object_ in values def test_kafka_writer( kafka_prefix: str, kafka_server: Tuple[Popen, int], consumer: Consumer): kafka_prefix += '.swh.journal.objects' config = { 'brokers': ['localhost:%d' % kafka_server[1]], 'client_id': 'kafka_writer', 'prefix': kafka_prefix, } writer = KafkaJournalWriter(**config) expected_messages = 0 for (object_type, (_, objects)) in OBJECT_TYPE_KEYS.items(): for (num, object_) in enumerate(objects): if object_type == 'origin_visit': object_ = {**object_, 'visit': num} if object_type == 'content': object_ = {**object_, 'ctime': datetime.datetime.now()} writer.write_addition(object_type, object_) expected_messages += 1 assert_written(consumer, kafka_prefix, expected_messages) def test_storage_direct_writer( kafka_prefix: str, kafka_server: Tuple[Popen, int], consumer: Consumer): kafka_prefix += '.swh.journal.objects' config = { 'brokers': ['localhost:%d' % kafka_server[1]], 'client_id': 'kafka_writer', 'prefix': kafka_prefix, } storage = get_storage('memory', {'journal_writer': { 'cls': 'kafka', 'args': config}}) expected_messages = 0 for (object_type, (_, objects)) in OBJECT_TYPE_KEYS.items(): method = getattr(storage, object_type + '_add') if object_type in ('content', 'directory', 'revision', 'release', 'snapshot', 'origin'): if object_type == 'content': objects = [{**obj, 'data': b''} for obj in objects] method(objects) expected_messages += len(objects) elif object_type in ('origin_visit',): for object_ in objects: object_ = object_.copy() origin_id = storage.origin_add_one(object_.pop('origin')) - del object_['type'] - visit = method(origin=origin_id, date=object_.pop('date')) + visit = method(origin=origin_id, date=object_.pop('date'), + type=object_.pop('type')) expected_messages += 1 visit_id = visit['visit'] storage.origin_visit_update(origin_id, visit_id, **object_) expected_messages += 1 else: assert False, object_type assert_written(consumer, kafka_prefix, expected_messages) diff --git a/swh/journal/tests/test_replay.py b/swh/journal/tests/test_replay.py index eea01e6..1adb2e7 100644 --- a/swh/journal/tests/test_replay.py +++ b/swh/journal/tests/test_replay.py @@ -1,207 +1,206 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import functools import random from subprocess import Popen from typing import Tuple import dateutil from confluent_kafka import Producer from hypothesis import strategies, given, settings from swh.storage import get_storage from swh.storage.in_memory import ENABLE_ORIGIN_IDS from swh.journal.client import JournalClient from swh.journal.serializers import key_to_kafka, value_to_kafka from swh.journal.replay import process_replay_objects, is_hash_in_bytearray from .conftest import OBJECT_TYPE_KEYS from .utils import MockedJournalClient, MockedKafkaWriter def test_storage_play( kafka_prefix: str, kafka_server: Tuple[Popen, int]): (_, port) = kafka_server kafka_prefix += '.swh.journal.objects' storage = get_storage('memory', {}) producer = Producer({ 'bootstrap.servers': 'localhost:{}'.format(port), 'client.id': 'test producer', 'enable.idempotence': 'true', }) now = datetime.datetime.now(tz=datetime.timezone.utc) # Fill Kafka nb_sent = 0 nb_visits = 0 for (object_type, (_, objects)) in OBJECT_TYPE_KEYS.items(): topic = kafka_prefix + '.' + object_type for object_ in objects: key = bytes(random.randint(0, 255) for _ in range(40)) object_ = object_.copy() if object_type == 'content': object_['ctime'] = now elif object_type == 'origin_visit': nb_visits += 1 object_['visit'] = nb_visits producer.produce( topic=topic, key=key_to_kafka(key), value=value_to_kafka(object_), ) nb_sent += 1 producer.flush() # Fill the storage from Kafka config = { 'brokers': 'localhost:%d' % kafka_server[1], 'group_id': 'replayer', 'prefix': kafka_prefix, 'max_messages': nb_sent, } replayer = JournalClient(**config) worker_fn = functools.partial(process_replay_objects, storage=storage) nb_inserted = 0 while nb_inserted < nb_sent: nb_inserted += replayer.process(worker_fn) assert nb_sent == nb_inserted # Check the objects were actually inserted in the storage assert OBJECT_TYPE_KEYS['revision'][1] == \ list(storage.revision_get( [rev['id'] for rev in OBJECT_TYPE_KEYS['revision'][1]])) assert OBJECT_TYPE_KEYS['release'][1] == \ list(storage.release_get( [rel['id'] for rel in OBJECT_TYPE_KEYS['release'][1]])) origins = list(storage.origin_get( [orig for orig in OBJECT_TYPE_KEYS['origin'][1]])) assert OBJECT_TYPE_KEYS['origin'][1] == \ - [{'url': orig['url'], 'type': orig['type']} for orig in origins] + [{'url': orig['url']} for orig in origins] for origin in origins: origin_id_or_url = \ origin['id'] if ENABLE_ORIGIN_IDS else origin['url'] expected_visits = [ { **visit, 'origin': origin_id_or_url, 'date': dateutil.parser.parse(visit['date']), } for visit in OBJECT_TYPE_KEYS['origin_visit'][1] if visit['origin']['url'] == origin['url'] - and visit['origin']['type'] == origin['type'] ] actual_visits = list(storage.origin_visit_get( origin_id_or_url)) for visit in actual_visits: del visit['visit'] # opaque identifier assert expected_visits == actual_visits contents = list(storage.content_get_metadata( [cont['sha1'] for cont in OBJECT_TYPE_KEYS['content'][1]])) assert None not in contents assert contents == OBJECT_TYPE_KEYS['content'][1] def _test_write_replay_origin_visit(visits): """Helper function to write tests for origin_visit. Each visit (a dict) given in the 'visits' argument will be sent to a (mocked) kafka queue, which a in-memory-storage backed replayer is listening to. Check that corresponding origin visits entities are present in the storage and have correct values. """ queue = [] replayer = MockedJournalClient(queue) writer = MockedKafkaWriter(queue) # Note that flipping the order of these two insertions will crash # the test, because the legacy origin_format does not allow to create # the origin when needed (type is missing) writer.send('origin', 'foo', { 'url': 'http://example.com/', 'type': 'git', }) for visit in visits: writer.send('origin_visit', 'foo', visit) queue_size = len(queue) storage = get_storage('memory', {}) worker_fn = functools.partial(process_replay_objects, storage=storage) nb_messages = 0 while nb_messages < queue_size: nb_messages += replayer.process(worker_fn) actual_visits = list(storage.origin_visit_get('http://example.com/')) assert len(actual_visits) == len(visits), actual_visits for vin, vout in zip(visits, actual_visits): vin = vin.copy() vout = vout.copy() if ENABLE_ORIGIN_IDS: assert vout.pop('origin') == 1 else: assert vout.pop('origin') == 'http://example.com/' vin.pop('origin') vin.setdefault('type', 'git') vin.setdefault('metadata', None) assert vin == vout def test_write_replay_legacy_origin_visit1(): """Test origin_visit when the 'origin' is just a string.""" now = datetime.datetime.now() visits = [{ 'visit': 1, 'origin': 'http://example.com/', 'date': now, 'type': 'git', 'status': 'partial', 'snapshot': None, }] _test_write_replay_origin_visit(visits) def test_write_replay_legacy_origin_visit2(): """Test origin_visit when 'type' is missing.""" now = datetime.datetime.now() visits = [{ 'visit': 1, 'origin': { 'url': 'http://example.com/', 'type': 'git', }, 'date': now, 'type': 'git', 'status': 'partial', 'snapshot': None, }] _test_write_replay_origin_visit(visits) hash_strategy = strategies.binary(min_size=20, max_size=20) @settings(max_examples=500) @given(strategies.sets(hash_strategy, min_size=0, max_size=500), strategies.sets(hash_strategy, min_size=10)) def test_is_hash_in_bytearray(haystack, needles): array = b''.join(sorted(haystack)) needles |= haystack # Exhaustively test for all objects in the array for needle in needles: assert is_hash_in_bytearray(needle, array, len(haystack)) == \ (needle in haystack) diff --git a/swh/journal/writer/kafka.py b/swh/journal/writer/kafka.py index 0b1522d..70ed938 100644 --- a/swh/journal/writer/kafka.py +++ b/swh/journal/writer/kafka.py @@ -1,110 +1,110 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging from confluent_kafka import Producer, KafkaException from swh.model.hashutil import DEFAULT_ALGORITHMS from swh.model.model import BaseModel from swh.journal.serializers import key_to_kafka, value_to_kafka logger = logging.getLogger(__name__) class KafkaJournalWriter: """This class is instantiated and used by swh-storage to write incoming new objects to Kafka before adding them to the storage backend (eg. postgresql) itself.""" def __init__(self, brokers, prefix, client_id): self._prefix = prefix if isinstance(brokers, str): brokers = [brokers] self.producer = Producer({ 'bootstrap.servers': ','.join(brokers), 'client.id': client_id, 'on_delivery': self._on_delivery, 'error_cb': self._error_cb, 'logger': logger, 'enable.idempotence': 'true', }) def _error_cb(self, error): if error.fatal(): raise KafkaException(error) logger.info('Received non-fatal kafka error: %s', error) def _on_delivery(self, error, message): if error is not None: self._error_cb(error) def send(self, topic, key, value): self.producer.produce( topic=topic, key=key_to_kafka(key), value=value_to_kafka(value), ) # Need to service the callbacks regularly by calling poll self.producer.poll(0) def flush(self): self.producer.flush() def _get_key(self, object_type, object_): if object_type in ('revision', 'release', 'directory', 'snapshot'): return object_['id'] elif object_type == 'content': return object_['sha1'] # TODO: use a dict of hashes elif object_type == 'skipped_content': return { hash: object_[hash] for hash in DEFAULT_ALGORITHMS } elif object_type == 'origin': - return {'url': object_['url'], 'type': object_['type']} + return {'url': object_['url']} elif object_type == 'origin_visit': return { 'origin': object_['origin'], 'date': str(object_['date']), } else: raise ValueError('Unknown object type: %s.' % object_type) def _sanitize_object(self, object_type, object_): if object_type == 'origin_visit': return { **object_, 'date': str(object_['date']), } elif object_type == 'origin': assert 'id' not in object_ return object_ def write_addition(self, object_type, object_, flush=True): """Write a single object to the journal""" if isinstance(object_, BaseModel): object_ = object_.to_dict() topic = '%s.%s' % (self._prefix, object_type) key = self._get_key(object_type, object_) object_ = self._sanitize_object(object_type, object_) logger.debug('topic: %s, key: %s, value: %s', topic, key, object_) self.send(topic, key=key, value=object_) if flush: self.flush() write_update = write_addition def write_additions(self, object_type, objects, flush=True): """Write a set of objects to the journal""" for object_ in objects: self.write_addition(object_type, object_, flush=False) if flush: self.flush()