diff --git a/swh/journal/cli.py b/swh/journal/cli.py index e0d3090..df44939 100644 --- a/swh/journal/cli.py +++ b/swh/journal/cli.py @@ -1,105 +1,89 @@ # Copyright (C) 2016-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import click import logging import os from swh.core import config from swh.storage import get_storage -from swh.journal.publisher import JournalPublisher from swh.journal.replay import StorageReplayer CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help']) @click.group(context_settings=CONTEXT_SETTINGS) @click.option('--config-file', '-C', default=None, type=click.Path(exists=True, dir_okay=False,), help="Configuration file.") @click.option('--log-level', '-l', default='INFO', type=click.Choice(logging._nameToLevel.keys()), help="Log level (default to INFO)") @click.pass_context def cli(ctx, config_file, log_level): """Software Heritage Scheduler CLI interface Default to use the the local scheduler instance (plugged to the main scheduler db). """ if not config_file: config_file = os.environ.get('SWH_CONFIG_FILENAME') if not config_file: raise ValueError('You must either pass a config-file parameter ' 'or set SWH_CONFIG_FILENAME to target ' 'the config-file') if not os.path.exists(config_file): raise ValueError('%s does not exist' % config_file) conf = config.read(config_file) ctx.ensure_object(dict) logging.basicConfig( level=log_level, format='%(asctime)s %(levelname)s %(name)s %(message)s', ) _log = logging.getLogger('kafka') _log.setLevel(logging.INFO) ctx.obj['config'] = conf ctx.obj['loglevel'] = log_level -@cli.command() -@click.pass_context -def publisher(ctx): - """Manipulate publisher - - """ - conf = ctx.obj['config'] - publisher = JournalPublisher(conf) - try: - while True: - publisher.poll() - except KeyboardInterrupt: - ctx.exit(0) - - @cli.command() @click.option('--max-messages', '-m', default=None, type=int, help='Maximum number of objects to replay. Default is to ' 'run forever.') @click.option('--broker', 'brokers', type=str, multiple=True, help='Kafka broker to connect to.') @click.option('--prefix', type=str, default='swh.journal.objects', help='Prefix of Kafka topic names to read from.') @click.option('--consumer-id', type=str, help='Name of the consumer/group id for reading from Kafka.') @click.pass_context def replay(ctx, brokers, prefix, consumer_id, max_messages): """Fill a new storage by reading a journal. """ conf = ctx.obj['config'] storage = get_storage(**conf.pop('storage')) replayer = StorageReplayer(brokers, prefix, consumer_id) try: replayer.fill(storage, max_messages=max_messages) except KeyboardInterrupt: ctx.exit(0) else: print('Done.') def main(): return cli(auto_envvar_prefix='SWH_JOURNAL') if __name__ == '__main__': main() diff --git a/swh/journal/publisher.py b/swh/journal/publisher.py deleted file mode 100644 index 1bf7f58..0000000 --- a/swh/journal/publisher.py +++ /dev/null @@ -1,222 +0,0 @@ -# Copyright (C) 2016-2019 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -from collections import defaultdict -import logging - -from kafka import KafkaProducer, KafkaConsumer - -from swh.storage import get_storage -from swh.storage.algos import snapshot - -from .serializers import kafka_to_key, key_to_kafka - -logger = logging.getLogger(__name__) - - -MANDATORY_KEYS = [ - 'brokers', 'temporary_prefix', 'final_prefix', 'consumer_id', - 'publisher_id', 'object_types', 'storage' -] - - -class JournalPublisher: - """The journal publisher is a layer in charge of: - - - consuming messages from topics (1 topic per object_type) - - reify the object ids read from those topics (using the storage) - - producing those reified objects to output topics (1 topic per - object type) - - The main entry point for this class is the 'poll' method. - - """ - def __init__(self, config): - self.config = config - self.check_config(config) - self._prepare_storage(config) - self._prepare_journal(config) - self.max_messages = self.config['max_messages'] - logger.setLevel(logging.DEBUG) - - def check_config(self, config): - """Check the configuration is fine. - - If not raise an error. - - """ - missing_keys = [] - for key in MANDATORY_KEYS: - if not config.get(key): - missing_keys.append(key) - - if missing_keys: - raise ValueError( - 'Configuration error: The following keys must be' - ' provided: %s' % (','.join(missing_keys), )) - - def _prepare_journal(self, config): - """Prepare the consumer and subscriber instances for the publisher to - actually be able to discuss with the journal. - - """ - # yes, the temporary topics contain values that are actually _keys_ - self.consumer = KafkaConsumer( - bootstrap_servers=config['brokers'], - value_deserializer=kafka_to_key, - auto_offset_reset='earliest', - enable_auto_commit=False, - group_id=config['consumer_id'], - ) - self.producer = KafkaProducer( - bootstrap_servers=config['brokers'], - key_serializer=key_to_kafka, - value_serializer=key_to_kafka, - client_id=config['publisher_id'], - ) - - logger.info('Subscribing to object types event: %s' % ( - config['object_types'], )) - self.consumer.subscribe( - topics=['%s.%s' % (config['temporary_prefix'], object_type) - for object_type in config['object_types']], - ) - - def _prepare_storage(self, config): - """Prepare the storage instance needed for the publisher to be able to - discuss with the storage to retrieve the objects. - - """ - self.storage = get_storage(**config['storage']) - - def poll(self, max_messages=None): - """Process a batch of messages from the consumer's topics. Use the - storage to reify those ids. Produces back those reified - objects to the production topics. - - This method polls a given amount of message then stops. - The number of messages to consume is either provided or - configured as fallback. - - The following method is expected to be called from within a - loop. - - """ - messages = defaultdict(list) - if max_messages is None: - max_messages = self.max_messages - - for num, message in enumerate(self.consumer): - object_type = message.topic.split('.')[-1] - logger.debug('num: %s, object_type: %s, message: %s' % ( - num+1, object_type, message)) - messages[object_type].append(message.value) - if num + 1 >= self.max_messages: - break - - logger.debug('number of messages: %s', num+1) - - new_objects = self.process_objects(messages) - self.produce_messages(new_objects) - self.consumer.commit() - - def process_objects(self, messages): - """Given a dict of messages {object type: [object id]}, reify those - ids to swh object from the storage and returns a - corresponding dict. - - Args: - messages (dict): Dict of {object_type: [id-as-bytes]} - - Returns: - Dict of {object_type: [tuple]}. - - object_type (str): content, revision, release - tuple (bytes, dict): object id as bytes, object as swh dict. - - """ - processors = { - 'content': self.process_contents, - 'revision': self.process_revisions, - 'release': self.process_releases, - 'snapshot': self.process_snapshots, - 'origin': self.process_origins, - 'origin_visit': self.process_origin_visits, - } - - return { - key: processors[key](value) - for key, value in messages.items() - } - - def produce_messages(self, messages): - """Produce new swh object to the producer topic. - - Args: - messages ([dict]): Dict of {object_type: [tuple]}. - - object_type (str): content, revision, release - tuple (bytes, dict): object id as bytes, object as swh dict. - - """ - for object_type, objects in messages.items(): - topic = '%s.%s' % (self.config['final_prefix'], object_type) - for key, object in objects: - logger.debug('topic: %s, key: %s, value: %s' % ( - topic, key, object)) - self.producer.send(topic, key=key, value=object) - - self.producer.flush() - - def process_contents(self, content_objs): - logger.debug('contents: %s' % content_objs) - metadata = self.storage.content_get_metadata( - (c[b'sha1'] for c in content_objs)) - return [(content['sha1'], content) for content in metadata] - - def process_revisions(self, revision_objs): - logger.debug('revisions: %s' % revision_objs) - metadata = self.storage.revision_get((r[b'id'] for r in revision_objs)) - return [(revision['id'], revision) - for revision in metadata if revision] - - def process_releases(self, release_objs): - logger.debug('releases: %s' % release_objs) - metadata = self.storage.release_get((r[b'id'] for r in release_objs)) - return [(release['id'], release) for release in metadata] - - def process_origins(self, origin_objs): - logger.debug('origins: %s' % origin_objs) - r = [] - for o in origin_objs: - origin = {'url': o[b'url'], 'type': o[b'type']} - r.append((origin, origin)) - return r - - def process_origin_visits(self, origin_visits): - logger.debug('origin_visits: %s' % origin_visits) - metadata = [] - for ov in origin_visits: - origin_visit = self.storage.origin_visit_get_by( - ov[b'origin'], ov[b'visit']) - if origin_visit: - pk = ov[b'origin'], ov[b'visit'] - origin_visit['date'] = str(origin_visit['date']) - metadata.append((pk, origin_visit)) - return metadata - - def process_snapshots(self, snapshot_objs): - logger.debug('snapshots: %s' % snapshot_objs) - metadata = [] - for snap in snapshot_objs: - full_obj = snapshot.snapshot_get_all_branches( - self.storage, snap[b'id']) - metadata.append((full_obj['id'], full_obj)) - - return metadata - - -if __name__ == '__main__': - print('Please use the "swh-journal publisher run" command') diff --git a/swh/journal/tests/conftest.py b/swh/journal/tests/conftest.py index 62ef89e..80e8d90 100644 --- a/swh/journal/tests/conftest.py +++ b/swh/journal/tests/conftest.py @@ -1,270 +1,218 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import pytest import logging import random import string -from kafka import KafkaProducer, KafkaConsumer +from kafka import KafkaConsumer from subprocess import Popen from typing import Tuple, Dict from pathlib import Path from pytest_kafka import ( make_zookeeper_process, make_kafka_server, constants ) -from swh.journal.publisher import JournalPublisher from swh.model.hashutil import hash_to_bytes -from swh.journal.serializers import kafka_to_key, key_to_kafka, kafka_to_value +from swh.journal.serializers import kafka_to_key, kafka_to_value CONTENTS = [ { 'length': 3, 'sha1': hash_to_bytes( '34973274ccef6ab4dfaaf86599792fa9c3fe4689'), 'sha1_git': b'foo', 'blake2s256': b'bar', 'sha256': b'baz', 'status': 'visible', }, ] COMMITTER = [ { 'id': 1, 'fullname': b'foo', }, { 'id': 2, 'fullname': b'bar', } ] REVISIONS = [ { 'id': hash_to_bytes('7026b7c1a2af56521e951c01ed20f255fa054238'), 'message': b'hello', 'date': { 'timestamp': { 'seconds': 1234567891, 'microseconds': 0, }, 'offset': 120, 'negative_utc': None, }, 'committer': COMMITTER[0], 'author': COMMITTER[0], 'committer_date': None, }, { 'id': hash_to_bytes('368a48fe15b7db2383775f97c6b247011b3f14f4'), 'message': b'hello again', 'date': { 'timestamp': { 'seconds': 1234567892, 'microseconds': 0, }, 'offset': 120, 'negative_utc': None, }, 'committer': COMMITTER[1], 'author': COMMITTER[1], 'committer_date': None, }, ] RELEASES = [ { 'id': hash_to_bytes('d81cc0710eb6cf9efd5b920a8453e1e07157b6cd'), 'name': b'v0.0.1', 'date': { 'timestamp': { 'seconds': 1234567890, 'microseconds': 0, }, 'offset': 120, 'negative_utc': None, }, 'author': COMMITTER[0], }, ] ORIGINS = [ { 'url': 'https://somewhere.org/den/fox', 'type': 'git', }, { 'url': 'https://overtherainbow.org/fox/den', 'type': 'svn', } ] ORIGIN_VISITS = [ { 'origin': ORIGINS[0], 'date': '2013-05-07 04:20:39.369271+00:00', 'snapshot': None, # TODO 'status': 'ongoing', # TODO 'metadata': {'foo': 'bar'}, }, { 'origin': ORIGINS[0], 'date': '2018-11-27 17:20:39+00:00', 'snapshot': None, # TODO 'status': 'ongoing', # TODO 'metadata': {'baz': 'qux'}, } ] # From type to tuple (id, ) OBJECT_TYPE_KEYS = { 'content': ('sha1', CONTENTS), 'revision': ('id', REVISIONS), 'release': ('id', RELEASES), 'origin': (None, ORIGINS), 'origin_visit': (None, ORIGIN_VISITS), } -class JournalPublisherTest(JournalPublisher): - """A journal publisher which override the default configuration - parsing setup. - - """ - def _prepare_storage(self, config): - super()._prepare_storage(config) - self.storage.content_add({'data': b'42', **c} for c in CONTENTS) - self.storage.revision_add(REVISIONS) - self.storage.release_add(RELEASES) - origins = self.storage.origin_add(ORIGINS) - origin_visits = [] - for i, ov in enumerate(ORIGIN_VISITS): - origin_id = origins[i]['id'] - ov = self.storage.origin_visit_add(origin_id, ov['date']) - origin_visits.append(ov) - self.origins = origins - self.origin_visits = origin_visits - - KAFKA_ROOT = os.environ.get('SWH_KAFKA_ROOT') KAFKA_ROOT = KAFKA_ROOT if KAFKA_ROOT else os.path.dirname(__file__) + '/kafka' if not os.path.exists(KAFKA_ROOT): msg = ('Development error: %s must exist and target an ' 'existing kafka installation' % KAFKA_ROOT) raise ValueError(msg) KAFKA_SCRIPTS = Path(KAFKA_ROOT) / 'bin' KAFKA_BIN = str(KAFKA_SCRIPTS / 'kafka-server-start.sh') ZOOKEEPER_BIN = str(KAFKA_SCRIPTS / 'zookeeper-server-start.sh') # Those defines fixtures zookeeper_proc = make_zookeeper_process(ZOOKEEPER_BIN, scope='session') os.environ['KAFKA_LOG4J_OPTS'] = \ '-Dlog4j.configuration=file:%s/log4j.properties' % \ os.path.dirname(__file__) kafka_server = make_kafka_server(KAFKA_BIN, 'zookeeper_proc', scope='session') logger = logging.getLogger('kafka') logger.setLevel(logging.WARN) @pytest.fixture(scope='function') def kafka_prefix(): return ''.join(random.choice(string.ascii_lowercase) for _ in range(10)) TEST_CONFIG = { 'temporary_prefix': 'swh.tmp_journal.new', 'final_prefix': 'swh.journal.objects', 'consumer_id': 'swh.journal.publisher', 'publisher_id': 'swh.journal.publisher', 'object_types': OBJECT_TYPE_KEYS.keys(), 'max_messages': 1, # will read 1 message and stops 'storage': {'cls': 'memory', 'args': {}}, } @pytest.fixture def test_config(kafka_server: Tuple[Popen, int], kafka_prefix: str): """Test configuration needed for publisher/producer/consumer """ _, port = kafka_server return { **TEST_CONFIG, 'brokers': ['localhost:{}'.format(port)], 'temporary_prefix': kafka_prefix + '.swh.tmp_journal.new', 'final_prefix': kafka_prefix + '.swh.journal.objects', } @pytest.fixture -def producer_to_publisher( - kafka_server: Tuple[Popen, int], - test_config: Dict, -) -> KafkaProducer: # noqa - """Producer to send message to the publisher's consumer. - - """ - _, port = kafka_server - producer = KafkaProducer( - bootstrap_servers='localhost:{}'.format(port), - key_serializer=key_to_kafka, - value_serializer=key_to_kafka, - client_id=test_config['consumer_id'], - ) - return producer - - -@pytest.fixture -def consumer_from_publisher(kafka_server: Tuple[Popen, int], - test_config: Dict) -> KafkaConsumer: +def consumer( + kafka_server: Tuple[Popen, int], test_config: Dict) -> KafkaConsumer: """Get a connected Kafka consumer. """ kafka_topics = [ '%s.%s' % (test_config['final_prefix'], object_type) for object_type in test_config['object_types']] _, kafka_port = kafka_server consumer = KafkaConsumer( *kafka_topics, bootstrap_servers='localhost:{}'.format(kafka_port), consumer_timeout_ms=constants.DEFAULT_CONSUMER_TIMEOUT_MS, key_deserializer=kafka_to_key, value_deserializer=kafka_to_value, auto_offset_reset='earliest', enable_auto_commit=True, group_id="test-consumer" ) # Enforce auto_offset_reset=earliest even if the consumer was created # too soon wrt the server. while len(consumer.assignment()) == 0: consumer.poll(timeout_ms=20) consumer.seek_to_beginning() return consumer - - -@pytest.fixture -def publisher(kafka_server: Tuple[Popen, int], - test_config: Dict) -> JournalPublisher: - """Test Publisher factory. We cannot use a fixture here as we need to - modify the sample. - - """ - # consumer and producer of the publisher needs to discuss with the - # right instance - publisher = JournalPublisherTest(test_config) - return publisher diff --git a/swh/journal/tests/test_direct_writer.py b/swh/journal/tests/test_direct_writer.py index 80e1819..e63db5a 100644 --- a/swh/journal/tests/test_direct_writer.py +++ b/swh/journal/tests/test_direct_writer.py @@ -1,105 +1,105 @@ # Copyright (C) 2018-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import defaultdict import datetime import time from kafka import KafkaConsumer from subprocess import Popen from typing import Tuple from swh.storage import get_storage from swh.journal.direct_writer import DirectKafkaWriter from swh.journal.serializers import value_to_kafka, kafka_to_value from .conftest import OBJECT_TYPE_KEYS def assert_written(consumer, kafka_prefix): time.sleep(0.1) # Without this, some messages are missing consumed_objects = defaultdict(list) for msg in consumer: consumed_objects[msg.topic].append((msg.key, msg.value)) for (object_type, (key_name, objects)) in OBJECT_TYPE_KEYS.items(): topic = kafka_prefix + '.' + object_type (keys, values) = zip(*consumed_objects[topic]) if key_name: assert list(keys) == [object_[key_name] for object_ in objects] else: pass # TODO if object_type == 'origin_visit': for value in values: del value['visit'] elif object_type == 'content': for value in values: del value['ctime'] for object_ in objects: assert kafka_to_value(value_to_kafka(object_)) in values def test_direct_writer( kafka_prefix: str, kafka_server: Tuple[Popen, int], - consumer_from_publisher: KafkaConsumer): + consumer: KafkaConsumer): kafka_prefix += '.swh.journal.objects' config = { 'brokers': 'localhost:%d' % kafka_server[1], 'client_id': 'direct_writer', 'prefix': kafka_prefix, } writer = DirectKafkaWriter(**config) for (object_type, (_, objects)) in OBJECT_TYPE_KEYS.items(): for (num, object_) in enumerate(objects): if object_type == 'origin_visit': object_ = {**object_, 'visit': num} if object_type == 'content': object_ = {**object_, 'ctime': datetime.datetime.now()} writer.write_addition(object_type, object_) - assert_written(consumer_from_publisher, kafka_prefix) + assert_written(consumer, kafka_prefix) def test_storage_direct_writer( kafka_prefix: str, kafka_server: Tuple[Popen, int], - consumer_from_publisher: KafkaConsumer): + consumer: KafkaConsumer): kafka_prefix += '.swh.journal.objects' config = { 'brokers': 'localhost:%d' % kafka_server[1], 'client_id': 'direct_writer', 'prefix': kafka_prefix, } storage = get_storage('memory', {'journal_writer': { 'cls': 'kafka', 'args': config}}) for (object_type, (_, objects)) in OBJECT_TYPE_KEYS.items(): method = getattr(storage, object_type + '_add') if object_type in ('content', 'directory', 'revision', 'release', 'snapshot', 'origin'): if object_type == 'content': objects = [{**obj, 'data': b''} for obj in objects] method(objects) elif object_type in ('origin_visit',): for object_ in objects: object_ = object_.copy() origin_id = storage.origin_add_one(object_.pop('origin')) visit = method(origin=origin_id, date=object_.pop('date')) visit_id = visit['visit'] storage.origin_visit_update(origin_id, visit_id, **object_) else: assert False, object_type - assert_written(consumer_from_publisher, kafka_prefix) + assert_written(consumer, kafka_prefix) diff --git a/swh/journal/tests/test_publisher_kafka.py b/swh/journal/tests/test_publisher_kafka.py deleted file mode 100644 index 718b069..0000000 --- a/swh/journal/tests/test_publisher_kafka.py +++ /dev/null @@ -1,93 +0,0 @@ -# Copyright (C) 2018-2019 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - - -from kafka import KafkaConsumer, KafkaProducer -from subprocess import Popen -from typing import Tuple - -from swh.journal.serializers import value_to_kafka, kafka_to_value -from swh.journal.publisher import JournalPublisher - -from .conftest import OBJECT_TYPE_KEYS - - -def assert_publish_ok(publisher: JournalPublisher, - consumer_from_publisher: KafkaConsumer, - producer_to_publisher: KafkaProducer, - test_config: dict, - object_type: str): - """Assert that publishing object in the publisher is reified and - published in output topics. - - Args: - publisher (JournalPublisher): publisher to read and write data - consumer_from_publisher (KafkaConsumer): To read data from the - publisher - producer_to_publisher (KafkaProducer): To send data to the publisher - object_type (str): Object type to look for (e.g content, revision, - etc...) - - """ - # object type's id label key - object_key_id, expected_objects = OBJECT_TYPE_KEYS[object_type] - # objects to send to the publisher - if object_key_id: - objects = [{object_key_id: c[object_key_id]} - for c in expected_objects] - else: - # TODO: add support for origin and origin_visit - return - - # send message to the publisher - for obj in objects: - producer_to_publisher.send( - '%s.%s' % (test_config['temporary_prefix'], object_type), - obj - ) - - nb_messages = len(objects) - - for _ in range(nb_messages): - publisher.poll(max_messages=1) - - # then (client reads from the messages from output topic) - expected_topic = '%s.%s' % (test_config['final_prefix'], object_type) - expected_msgs = [ - ( - object_[object_key_id], - kafka_to_value(value_to_kafka(object_)) - ) - for object_ in expected_objects] - - msgs = list(consumer_from_publisher) - assert all(msg.topic == expected_topic for msg in msgs) - assert [(msg.key, msg.value) for msg in msgs] == expected_msgs - - -def test_publish( - publisher: JournalPublisher, - kafka_server: Tuple[Popen, int], - test_config: dict, - consumer_from_publisher: KafkaConsumer, - producer_to_publisher: KafkaProducer): - """ - Reading from and writing to the journal publisher should work (contents) - - Args: - journal_publisher (JournalPublisher): publisher to read and write data - consumer_from_publisher (KafkaConsumer): To read data from publisher - producer_to_publisher (KafkaProducer): To send data to publisher - - """ - # retrieve the object types we want to test - object_types = OBJECT_TYPE_KEYS.keys() - # Now for each object type, we'll send data to the publisher and - # check that data is indeed fetched and reified in the publisher's - # output topics - for object_type in object_types: - assert_publish_ok( - publisher, consumer_from_publisher, producer_to_publisher, - test_config, object_type) diff --git a/swh/journal/tests/test_publisher_no_kafka.py b/swh/journal/tests/test_publisher_no_kafka.py deleted file mode 100644 index 10f23fd..0000000 --- a/swh/journal/tests/test_publisher_no_kafka.py +++ /dev/null @@ -1,151 +0,0 @@ -# Copyright (C) 2018-2019 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -import pytest -import unittest - -from .conftest import ( - JournalPublisherTest, TEST_CONFIG, - CONTENTS, REVISIONS, RELEASES, ORIGINS -) -from swh.journal.publisher import MANDATORY_KEYS - - -class JournalPublisherNoKafkaInMemoryStorage(JournalPublisherTest): - """A journal publisher with: - - no kafka dependency - - in-memory storage - - """ - def check_config(self, config): - """No need to check the configuration here as we do not use kafka - - """ - pass - - def _prepare_journal(self, config): - """No journal for now - - """ - pass - - -class TestPublisherNoKafka(unittest.TestCase): - """This tests only the part not using any kafka instance - - """ - def setUp(self): - self.publisher = JournalPublisherNoKafkaInMemoryStorage(TEST_CONFIG) - self.contents = [{b'sha1': c['sha1']} for c in CONTENTS] - self.revisions = [{b'id': c['id']} for c in REVISIONS] - self.releases = [{b'id': c['id']} for c in RELEASES] - # those needs id generation from the storage - # so initialization is different than other entities - self.origins = [{b'url': o['url'], - b'type': o['type']} - for o in self.publisher.origins] - self.origin_visits = [{b'origin': ov['origin'], - b'visit': ov['visit']} - for ov in self.publisher.origin_visits] - # full objects - storage = self.publisher.storage - ovs = [] - for ov in self.origin_visits: - _ov = storage.origin_visit_get_by( - ov[b'origin'], ov[b'visit']) - _ov['date'] = str(_ov['date']) - ovs.append(_ov) - self.expected_origin_visits = ovs - - def test_process_contents(self): - actual_contents = self.publisher.process_contents(self.contents) - expected_contents = [(c['sha1'], c) for c in CONTENTS] - self.assertEqual(actual_contents, expected_contents) - - def test_process_revisions(self): - actual_revisions = self.publisher.process_revisions(self.revisions) - expected_revisions = [(c['id'], c) for c in REVISIONS] - self.assertEqual(actual_revisions, expected_revisions) - - def test_process_releases(self): - actual_releases = self.publisher.process_releases(self.releases) - expected_releases = [(c['id'], c) for c in RELEASES] - self.assertEqual(actual_releases, expected_releases) - - def test_process_origins(self): - actual_origins = self.publisher.process_origins(self.origins) - expected_origins = [({'url': o[b'url'], 'type': o[b'type']}, - {'url': o[b'url'], 'type': o[b'type']}) - for o in self.origins] - self.assertEqual(actual_origins, expected_origins) - - def test_process_origin_visits(self): - actual_ovs = self.publisher.process_origin_visits(self.origin_visits) - expected_ovs = [((ov['origin'], ov['visit']), ov) - for ov in self.expected_origin_visits] - self.assertEqual(actual_ovs, expected_ovs) - - def test_process_objects(self): - messages = { - 'content': self.contents, - 'revision': self.revisions, - 'release': self.releases, - 'origin': self.origins, - 'origin_visit': self.origin_visits, - } - - actual_objects = self.publisher.process_objects(messages) - - expected_contents = [(c['sha1'], c) for c in CONTENTS] - expected_revisions = [(c['id'], c) for c in REVISIONS] - expected_releases = [(c['id'], c) for c in RELEASES] - expected_origins = [(o, o) for o in ORIGINS] - expected_ovs = [((ov['origin'], ov['visit']), ov) - for ov in self.expected_origin_visits] - expected_objects = { - 'content': expected_contents, - 'revision': expected_revisions, - 'release': expected_releases, - 'origin': expected_origins, - 'origin_visit': expected_ovs, - } - - self.assertEqual(actual_objects, expected_objects) - - -class JournalPublisherCheckTest(JournalPublisherTest): - """A journal publisher with: - - no kafka dependency - - in-memory storage - - """ - def _prepare_journal(self, config): - """No journal for now - - """ - pass - - -def test_check_config_ok(test_config): - """Instantiate a publisher with the right config is fine - - """ - publisher = JournalPublisherCheckTest(test_config) - assert publisher is not None - - -def test_check_config_ko(test_config): - """Instantiate a publisher with the wrong config should raise - - """ - for k in MANDATORY_KEYS: - conf = test_config.copy() - conf.pop(k) - with pytest.raises(ValueError) as e: - JournalPublisherCheckTest(conf) - - error = ('Configuration error: The following keys must be' - ' provided: %s' % (','.join([k]), )) - assert e.value.args[0] == error