diff --git a/swh/journal/backfill.py b/swh/journal/backfill.py new file mode 100644 --- /dev/null +++ b/swh/journal/backfill.py @@ -0,0 +1,453 @@ +# Copyright (C) 2017-2019 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +"""Module defining journal backfiller classes. + +Those backfiller goal is to produce back part or all of the objects +from the storage to the journal topics + +At the moment, a first naive implementation is the +JournalBackfiller. It simply reads the objects from the +storage and sends every object identifier back to the journal. + +""" + +import logging + +from .direct_writer import DirectKafkaWriter + +from swh.core.db import BaseDb +from swh.storage.converters import db_to_release, db_to_revision + + +logger = logging.getLogger(__name__) + +PARTITION_KEY = { + 'content': 'sha1', + 'skipped_content': None, # unused + 'directory': 'id', + 'revision': 'revision.id', + 'release': 'release.id', + 'snapshot': 'id', + 'origin': 'id', + 'origin_visit': 'origin_visit.origin', +} + +COLUMNS = { + 'content': [ + 'sha1', 'sha1_git', 'sha256', 'blake2s256', 'length', 'status', + 'ctime' + ], + 'skipped_content': [ + 'sha1', 'sha1_git', 'sha256', 'blake2s256', 'length', 'ctime', + 'status', 'reason', + ], + 'directory': ['id', 'dir_entries', 'file_entries', 'rev_entries'], + 'revision': [ + ("revision.id", "id"), + "date", + "date_offset", + "committer_date", + "committer_date_offset", + "type", + "directory", + "message", + "synthetic", + "metadata", + "date_neg_utc_offset", + "committer_date_neg_utc_offset", + ("array(select parent_id::bytea from revision_history rh " + "where rh.id = revision.id order by rh.parent_rank asc)", + "parents"), + ("a.id", "author_id"), + ("a.name", "author_name"), + ("a.email", "author_email"), + ("a.fullname", "author_fullname"), + ("c.id", "committer_id"), + ("c.name", "committer_name"), + ("c.email", "committer_email"), + ("c.fullname", "committer_fullname"), + ], + 'release': [ + ("release.id", "id"), + "date", + "date_offset", + "comment", + ("release.name", "name"), + "synthetic", + "date_neg_utc_offset", + "target", + "target_type", + ("a.id", "author_id"), + ("a.name", "author_name"), + ("a.email", "author_email"), + ("a.fullname", "author_fullname"), + ], + 'snapshot': ['id', 'object_id'], + 'origin': ['type', 'url'], + 'origin_visit': ['type', 'url', 'date', 'snapshot', 'status'], +} + + +JOINS = { + 'release': ['person a on release.author=a.id'], + 'revision': ['person a on revision.author=a.id', + 'person c on revision.committer=c.id'], + 'origin_visit': ['origin on origin_visit.origin=origin.id'], +} + + +def directory_converter(db, directory): + """Convert directory from the flat representation to swh model + compatible objects. + + """ + columns = ['target', 'name', 'perms'] + query_template = ''' + select %(columns)s + from directory_entry_%(type)s + where id in %%s + ''' + + types = ['file', 'dir', 'rev'] + + entries = [] + with db.cursor() as cur: + for type in types: + ids = directory.pop('%s_entries' % type) + if not ids: + continue + query = query_template % { + 'columns': ','.join(columns), + 'type': type, + } + cur.execute(query, (tuple(ids), )) + for row in cur: + entry = dict(zip(columns, row)) + entry['type'] = type + entries.append(entry) + + directory['entries'] = entries + return directory + + +def revision_converter(db, revision): + """Convert revision from the flat representation to swh model + compatible objects. + + """ + revision = db_to_revision(revision) + if 'author' in revision and revision['author']: + del revision['author']['id'] + if 'committer' in revision and revision['committer']: + del revision['committer']['id'] + return revision + + +def release_converter(db, release): + """Convert release from the flat representation to swh model + compatible objects. + + """ + release = db_to_release(release) + if 'author' in release and release['author']: + del release['author']['id'] + return release + + +def snapshot_converter(db, snapshot): + """Convert snapshot from the flat representation to swh model + compatible objects. + + """ + columns = ['name', 'target', 'target_type'] + query = ''' + select %s + from snapshot_branches sbs + inner join snapshot_branch sb on sb.object_id=sbs.branch_id + where sbs.snapshot_id=%%s + ''' % ', '.join(columns) + with db.cursor() as cur: + cur.execute(query, (snapshot.pop('object_id'), )) + branches = {} + for name, *row in cur: + branch = dict(zip(columns[1:], row)) + if not branch['target'] and not branch['target_type']: + branch = None + branches[name] = branch + + snapshot['branches'] = branches + return snapshot + + +def origin_visit_converter(db, origin_visit): + origin = { + 'type': origin_visit.pop('type'), + 'url': origin_visit.pop('url'), + } + origin_visit['origin'] = origin + return origin_visit + + +CONVERTERS = { + 'directory': directory_converter, + 'revision': revision_converter, + 'release': release_converter, + 'snapshot': snapshot_converter, + 'origin_visit': origin_visit_converter, +} + + +def object_to_offset(object_id, numbits): + """Compute the index of the range containing object id, when dividing + space into 2^numbits. + + Args: + object_id (str): The hex representation of object_id + numbits (int): Number of bits in which we divide input space + + Returns: + The index of the range containing object id + + """ + q, r = divmod(numbits, 8) + length = q + (r != 0) + shift_bits = 8 - r if r else 0 + + truncated_id = object_id[:length * 2] + if len(truncated_id) < length * 2: + truncated_id += '0' * (length * 2 - len(truncated_id)) + + truncated_id_bytes = bytes.fromhex(truncated_id) + return int.from_bytes(truncated_id_bytes, byteorder='big') >> shift_bits + + +def byte_ranges(numbits, start_object=None, end_object=None): + """Generate start/end pairs of bytes spanning numbits bits and + constrained by optional start_object and end_object. + + Args: + numbits (int): Number of bits in which we divide input space + start_object (str): Hex object id contained in the first range + returned + end_object (str): Hex object id contained in the last range + returned + + Yields: + 2^numbits pairs of bytes + + """ + q, r = divmod(numbits, 8) + length = q + (r != 0) + shift_bits = 8 - r if r else 0 + + def to_bytes(i): + return int.to_bytes(i << shift_bits, length=length, byteorder='big') + + start_offset = 0 + end_offset = 1 << numbits + + if start_object is not None: + start_offset = object_to_offset(start_object, numbits) + if end_object is not None: + end_offset = object_to_offset(end_object, numbits) + 1 + + for start in range(start_offset, end_offset): + end = start + 1 + + if start == 0: + yield None, to_bytes(end) + elif end == 1 << numbits: + yield to_bytes(start), None + else: + yield to_bytes(start), to_bytes(end) + + +def integer_ranges(start, end, block_size=1000): + for start in range(start, end, block_size): + if start == 0: + yield None, block_size + elif start + block_size > end: + yield start, end + else: + yield start, start + block_size + + +RANGE_GENERATORS = { + 'content': lambda start, end: byte_ranges(24, start, end), + 'skipped_content': lambda start, end: [(None, None)], + 'directory': lambda start, end: byte_ranges(24, start, end), + 'revision': lambda start, end: byte_ranges(24, start, end), + 'release': lambda start, end: byte_ranges(16, start, end), + 'snapshot': lambda start, end: byte_ranges(16, start, end), + 'origin': integer_ranges, + 'origin_visit': integer_ranges, +} + + +def compute_query(obj_type, start, end): + columns = COLUMNS.get(obj_type) + join_specs = JOINS.get(obj_type, []) + join_clause = '\n'.join('left join %s' % clause for clause in join_specs) + + where = [] + where_args = [] + if start: + where.append('%(keys)s >= %%s') + where_args.append(start) + if end: + where.append('%(keys)s < %%s') + where_args.append(end) + + where_clause = '' + if where: + where_clause = ('where ' + ' and '.join(where)) % { + 'keys': '(%s)' % PARTITION_KEY[obj_type] + } + + column_specs = [] + column_aliases = [] + for column in columns: + if isinstance(column, str): + column_specs.append(column) + column_aliases.append(column) + else: + column_specs.append('%s as %s' % column) + column_aliases.append(column[1]) + + query = ''' +select %(columns)s +from %(table)s +%(join)s +%(where)s + ''' % { + 'columns': ','.join(column_specs), + 'table': obj_type, + 'join': join_clause, + 'where': where_clause, + } + + return query, where_args, column_aliases + + +def fetch(db, obj_type, start, end): + """Fetch all obj_type's identifiers from db. + + This opens one connection, stream objects and when done, close + the connection. + + Args: + db (BaseDb): Db connection object + obj_type (str): Object type + start (Union[bytes|Tuple]): Range start identifier + end (Union[bytes|Tuple]): Range end identifier + + Raises: + ValueError if obj_type is not supported + + Yields: + Objects in the given range + + """ + query, where_args, column_aliases = compute_query(obj_type, start, end) + converter = CONVERTERS.get(obj_type) + with db.cursor() as cursor: + logger.debug('Fetching data for table %s', obj_type) + logger.debug('query: %s %s', query, where_args) + cursor.execute(query, where_args) + for row in cursor: + record = dict(zip(column_aliases, row)) + if converter: + record = converter(db, record) + + logger.debug('record: %s' % record) + yield record + + +MANDATORY_KEYS = ['brokers', 'storage_dbconn', 'final_prefix', 'client_id'] + + +class JournalBackfiller: + """Class in charge of reading the storage's objects and sends those + back to the publisher queue. + + This is designed to be run periodically. + + """ + def __init__(self, config=None): + self.config = config + self.check_config(config) + + def check_config(self, config): + missing_keys = [] + for key in MANDATORY_KEYS: + if not config.get(key): + missing_keys.append(key) + + if missing_keys: + raise ValueError( + 'Configuration error: The following keys must be' + ' provided: %s' % (','.join(missing_keys), )) + + def parse_arguments(self, object_type, start_object, end_object): + """Parse arguments + + Raises: + ValueError for unsupported object type + ValueError if object ids are not parseable + + Returns: + Parsed start and end object ids + + """ + if object_type not in COLUMNS: + raise ValueError('Object type %s is not supported. ' + 'The only possible values are %s' % ( + object_type, ', '.join(COLUMNS.keys()))) + + if object_type in ['origin', 'origin_visit']: + if start_object: + start_object = int(start_object) + else: + start_object = 0 + if end_object: + end_object = int(end_object) + else: + end_object = 100 * 1000 * 1000 # hard-coded limit + + return start_object, end_object + + def run(self, object_type, start_object, end_object, dry_run=False): + """Reads storage's subscribed object types and send them to the + publisher's reading queue. + + """ + start_object, end_object = self.parse_arguments( + object_type, start_object, end_object) + + db = BaseDb.connect(self.config['storage_dbconn']) + writer = DirectKafkaWriter( + brokers=self.config['brokers'], + prefix=self.config['final_prefix'], + client_id=self.config['client_id'] + ) + for range_start, range_end in RANGE_GENERATORS[object_type]( + start_object, end_object): + logger.info('Processing %s range %s to %s', object_type, + range_start, range_end) + + for obj in fetch( + db, object_type, start=range_start, end=range_end, + ): + if dry_run: + continue + writer.write_addition(object_type=object_type, + object_=obj) + + writer.producer.flush() + + +if __name__ == '__main__': + print('Please use the "swh-journal backfiller run" command') diff --git a/swh/journal/checker.py b/swh/journal/checker.py deleted file mode 100644 --- a/swh/journal/checker.py +++ /dev/null @@ -1,137 +0,0 @@ -# Copyright (C) 2017 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -"""Module defining journal checker classes. - -Those checker goal is to send back all, or missing objects from the -journal queues. - -At the moment, a first naive implementation is the -SimpleCheckerProducer. It simply reads the objects from the -storage and sends every object identifier back to the journal. - -""" - -import psycopg2 - -from kafka import KafkaProducer - -from swh.core.config import SWHConfig -from .serializers import key_to_kafka - - -TYPE_TO_PRIMARY_KEY = { - 'origin': ['id'], - 'content': ['sha1', 'sha1_git', 'sha256'], - 'directory': ['id'], - 'revision': ['id'], - 'release': ['id'], - 'origin_visit': ['origin', 'visit'], - 'skipped_content': ['sha1', 'sha1_git', 'sha256'], -} - - -def entry_to_bytes(entry): - """Convert an entry coming from the database to bytes""" - if isinstance(entry, memoryview): - return entry.tobytes() - if isinstance(entry, tuple): - return [entry_to_bytes(value) for value in entry] - return entry - - -def fetch(db_conn, obj_type): - """Fetch all obj_type's identifiers from db. - - This opens one connection, stream objects and when done, close - the connection. - - Raises: - ValueError if obj_type is not supported - - Yields: - Identifiers for the specific object_type - - """ - primary_key = TYPE_TO_PRIMARY_KEY.get(obj_type) - if not primary_key: - raise ValueError('The object type %s is not supported. ' - 'Only possible values are %s' % ( - obj_type, TYPE_TO_PRIMARY_KEY.keys())) - - primary_key_str = ','.join(primary_key) - query = 'select %s from %s order by %s' % ( - primary_key_str, obj_type, primary_key_str) - server_side_cursor_name = 'swh.journal.%s' % obj_type - - with psycopg2.connect(db_conn) as db: - cursor = db.cursor(name=server_side_cursor_name) - cursor.execute(query) - for o in cursor: - yield dict(zip(primary_key, entry_to_bytes(o))) - - -class SimpleCheckerProducer(SWHConfig): - """Class in charge of reading the storage's objects and sends those - back to the publisher queue. - - This is designed to be run periodically. - - """ - DEFAULT_CONFIG = { - 'brokers': ('list[str]', ['getty.internal.softwareheritage.org']), - 'temporary_prefix': ('str', 'swh.tmp_journal.new'), - 'publisher_id': ('str', 'swh.journal.publisher.test'), - 'object_types': ('list[str]', ['content', 'revision', 'release']), - 'storage_dbconn': ('str', 'service=swh-dev'), - } - - CONFIG_BASE_FILENAME = 'journal/checker' - - def __init__(self, extra_configuration=None): - self.config = config = self.parse_config_file() - if extra_configuration: - config.update(extra_configuration) - - self.object_types = self.config['object_types'] - for obj_type in self.object_types: - if obj_type not in TYPE_TO_PRIMARY_KEY: - raise ValueError('The object type %s is not supported. ' - 'Possible values are %s' % ( - obj_type, - ', '.join(TYPE_TO_PRIMARY_KEY))) - - self.storage_dbconn = self.config['storage_dbconn'] - - self.producer = KafkaProducer( - bootstrap_servers=config['brokers'], - value_serializer=key_to_kafka, - client_id=config['publisher_id'], - ) - - def _read_storage(self): - """Read storage's objects and generates tuple as object_type, dict of - object. - - Yields: - tuple of object_type, object as dict - - """ - for obj_type in self.object_types: - for obj in fetch(self.storage_dbconn, obj_type): - yield obj_type, obj - - def run(self): - """Reads storage's subscribed object types and send them to the - publisher's reading queue. - - """ - for obj_type, obj in self._read_storage(): - topic = '%s.%s' % (self.config['temporary_prefix'], obj_type) - self.producer.send(topic, value=obj) - - -if __name__ == '__main__': - SimpleCheckerProducer().run() diff --git a/swh/journal/cli.py b/swh/journal/cli.py --- a/swh/journal/cli.py +++ b/swh/journal/cli.py @@ -10,8 +10,8 @@ from swh.core import config from swh.storage import get_storage -from swh.journal.publisher import JournalPublisher from swh.journal.replay import StorageReplayer +from swh.journal.backfill import JournalBackfiller CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help']) @@ -44,8 +44,10 @@ conf = config.read(config_file) ctx.ensure_object(dict) - logger = logging.getLogger(__name__) - logger.setLevel(log_level) + logging.basicConfig( + level=log_level, + format='%(asctime)s %(levelname)s %(name)s %(message)s', + ) _log = logging.getLogger('kafka') _log.setLevel(logging.INFO) @@ -54,21 +56,6 @@ ctx.obj['loglevel'] = log_level -@cli.command() -@click.pass_context -def publisher(ctx): - """Manipulate publisher - - """ - conf = ctx.obj['config'] - publisher = JournalPublisher(conf) - try: - while True: - publisher.poll() - except KeyboardInterrupt: - ctx.exit(0) - - @cli.command() @click.option('--max-messages', '-m', default=None, type=int, help='Maximum number of objects to replay. Default is to ' @@ -95,6 +82,27 @@ print('Done.') +@cli.command() +@click.argument('object_type') +@click.option('--start-object', default=None) +@click.option('--end-object', default=None) +@click.option('--dry-run', is_flag=True, default=False) +@click.pass_context +def backfiller(ctx, object_type, start_object, end_object, dry_run): + """Manipulate backfiller + + """ + conf = ctx.obj['config'] + backfiller = JournalBackfiller(conf) + try: + backfiller.run( + object_type=object_type, + start_object=start_object, end_object=end_object, + dry_run=dry_run) + except KeyboardInterrupt: + ctx.exit(0) + + def main(): return cli(auto_envvar_prefix='SWH_JOURNAL') diff --git a/swh/journal/direct_writer.py b/swh/journal/direct_writer.py --- a/swh/journal/direct_writer.py +++ b/swh/journal/direct_writer.py @@ -7,6 +7,8 @@ from kafka import KafkaProducer +from swh.model.hashutil import DEFAULT_ALGORITHMS + from .serializers import key_to_kafka, value_to_kafka logger = logging.getLogger(__name__) @@ -34,6 +36,11 @@ return object_['id'] elif object_type == 'content': return object_['sha1'] # TODO: use a dict of hashes + elif object_type == 'skipped_content': + return { + hash: object_[hash] + for hash in DEFAULT_ALGORITHMS + } elif object_type == 'origin': return {'url': object_['url'], 'type': object_['type']} elif object_type == 'origin_visit': diff --git a/swh/journal/publisher.py b/swh/journal/publisher.py deleted file mode 100644 --- a/swh/journal/publisher.py +++ /dev/null @@ -1,222 +0,0 @@ -# Copyright (C) 2016-2019 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -from collections import defaultdict -import logging - -from kafka import KafkaProducer, KafkaConsumer - -from swh.storage import get_storage -from swh.storage.algos import snapshot - -from .serializers import kafka_to_key, key_to_kafka - -logger = logging.getLogger(__name__) - - -MANDATORY_KEYS = [ - 'brokers', 'temporary_prefix', 'final_prefix', 'consumer_id', - 'publisher_id', 'object_types', 'storage' -] - - -class JournalPublisher: - """The journal publisher is a layer in charge of: - - - consuming messages from topics (1 topic per object_type) - - reify the object ids read from those topics (using the storage) - - producing those reified objects to output topics (1 topic per - object type) - - The main entry point for this class is the 'poll' method. - - """ - def __init__(self, config): - self.config = config - self.check_config(config) - self._prepare_storage(config) - self._prepare_journal(config) - self.max_messages = self.config['max_messages'] - logger.setLevel(logging.DEBUG) - - def check_config(self, config): - """Check the configuration is fine. - - If not raise an error. - - """ - missing_keys = [] - for key in MANDATORY_KEYS: - if not config.get(key): - missing_keys.append(key) - - if missing_keys: - raise ValueError( - 'Configuration error: The following keys must be' - ' provided: %s' % (','.join(missing_keys), )) - - def _prepare_journal(self, config): - """Prepare the consumer and subscriber instances for the publisher to - actually be able to discuss with the journal. - - """ - # yes, the temporary topics contain values that are actually _keys_ - self.consumer = KafkaConsumer( - bootstrap_servers=config['brokers'], - value_deserializer=kafka_to_key, - auto_offset_reset='earliest', - enable_auto_commit=False, - group_id=config['consumer_id'], - ) - self.producer = KafkaProducer( - bootstrap_servers=config['brokers'], - key_serializer=key_to_kafka, - value_serializer=key_to_kafka, - client_id=config['publisher_id'], - ) - - logger.info('Subscribing to object types event: %s' % ( - config['object_types'], )) - self.consumer.subscribe( - topics=['%s.%s' % (config['temporary_prefix'], object_type) - for object_type in config['object_types']], - ) - - def _prepare_storage(self, config): - """Prepare the storage instance needed for the publisher to be able to - discuss with the storage to retrieve the objects. - - """ - self.storage = get_storage(**config['storage']) - - def poll(self, max_messages=None): - """Process a batch of messages from the consumer's topics. Use the - storage to reify those ids. Produces back those reified - objects to the production topics. - - This method polls a given amount of message then stops. - The number of messages to consume is either provided or - configured as fallback. - - The following method is expected to be called from within a - loop. - - """ - messages = defaultdict(list) - if max_messages is None: - max_messages = self.max_messages - - for num, message in enumerate(self.consumer): - object_type = message.topic.split('.')[-1] - logger.debug('num: %s, object_type: %s, message: %s' % ( - num+1, object_type, message)) - messages[object_type].append(message.value) - if num + 1 >= self.max_messages: - break - - logger.debug('number of messages: %s', num+1) - - new_objects = self.process_objects(messages) - self.produce_messages(new_objects) - self.consumer.commit() - - def process_objects(self, messages): - """Given a dict of messages {object type: [object id]}, reify those - ids to swh object from the storage and returns a - corresponding dict. - - Args: - messages (dict): Dict of {object_type: [id-as-bytes]} - - Returns: - Dict of {object_type: [tuple]}. - - object_type (str): content, revision, release - tuple (bytes, dict): object id as bytes, object as swh dict. - - """ - processors = { - 'content': self.process_contents, - 'revision': self.process_revisions, - 'release': self.process_releases, - 'snapshot': self.process_snapshots, - 'origin': self.process_origins, - 'origin_visit': self.process_origin_visits, - } - - return { - key: processors[key](value) - for key, value in messages.items() - } - - def produce_messages(self, messages): - """Produce new swh object to the producer topic. - - Args: - messages ([dict]): Dict of {object_type: [tuple]}. - - object_type (str): content, revision, release - tuple (bytes, dict): object id as bytes, object as swh dict. - - """ - for object_type, objects in messages.items(): - topic = '%s.%s' % (self.config['final_prefix'], object_type) - for key, object in objects: - logger.debug('topic: %s, key: %s, value: %s' % ( - topic, key, object)) - self.producer.send(topic, key=key, value=object) - - self.producer.flush() - - def process_contents(self, content_objs): - logger.debug('contents: %s' % content_objs) - metadata = self.storage.content_get_metadata( - (c[b'sha1'] for c in content_objs)) - return [(content['sha1'], content) for content in metadata] - - def process_revisions(self, revision_objs): - logger.debug('revisions: %s' % revision_objs) - metadata = self.storage.revision_get((r[b'id'] for r in revision_objs)) - return [(revision['id'], revision) - for revision in metadata if revision] - - def process_releases(self, release_objs): - logger.debug('releases: %s' % release_objs) - metadata = self.storage.release_get((r[b'id'] for r in release_objs)) - return [(release['id'], release) for release in metadata] - - def process_origins(self, origin_objs): - logger.debug('origins: %s' % origin_objs) - r = [] - for o in origin_objs: - origin = {'url': o[b'url'], 'type': o[b'type']} - r.append((origin, origin)) - return r - - def process_origin_visits(self, origin_visits): - logger.debug('origin_visits: %s' % origin_visits) - metadata = [] - for ov in origin_visits: - origin_visit = self.storage.origin_visit_get_by( - ov[b'origin'], ov[b'visit']) - if origin_visit: - pk = ov[b'origin'], ov[b'visit'] - origin_visit['date'] = str(origin_visit['date']) - metadata.append((pk, origin_visit)) - return metadata - - def process_snapshots(self, snapshot_objs): - logger.debug('snapshots: %s' % snapshot_objs) - metadata = [] - for snap in snapshot_objs: - full_obj = snapshot.snapshot_get_all_branches( - self.storage, snap[b'id']) - metadata.append((full_obj['id'], full_obj)) - - return metadata - - -if __name__ == '__main__': - print('Please use the "swh-journal publisher run" command') diff --git a/swh/journal/tests/conftest.py b/swh/journal/tests/conftest.py --- a/swh/journal/tests/conftest.py +++ b/swh/journal/tests/conftest.py @@ -9,7 +9,7 @@ import random import string -from kafka import KafkaProducer, KafkaConsumer +from kafka import KafkaConsumer from subprocess import Popen from typing import Tuple, Dict @@ -18,10 +18,9 @@ make_zookeeper_process, make_kafka_server, constants ) -from swh.journal.publisher import JournalPublisher from swh.model.hashutil import hash_to_bytes -from swh.journal.serializers import kafka_to_key, key_to_kafka, kafka_to_value +from swh.journal.serializers import kafka_to_key, kafka_to_value CONTENTS = [ @@ -134,26 +133,6 @@ } -class JournalPublisherTest(JournalPublisher): - """A journal publisher which override the default configuration - parsing setup. - - """ - def _prepare_storage(self, config): - super()._prepare_storage(config) - self.storage.content_add({'data': b'42', **c} for c in CONTENTS) - self.storage.revision_add(REVISIONS) - self.storage.release_add(RELEASES) - origins = self.storage.origin_add(ORIGINS) - origin_visits = [] - for i, ov in enumerate(ORIGIN_VISITS): - origin_id = origins[i]['id'] - ov = self.storage.origin_visit_add(origin_id, ov['date']) - origin_visits.append(ov) - self.origins = origins - self.origin_visits = origin_visits - - KAFKA_ROOT = os.environ.get('SWH_KAFKA_ROOT') KAFKA_ROOT = KAFKA_ROOT if KAFKA_ROOT else os.path.dirname(__file__) + '/kafka' if not os.path.exists(KAFKA_ROOT): @@ -210,26 +189,8 @@ @pytest.fixture -def producer_to_publisher( - kafka_server: Tuple[Popen, int], - test_config: Dict, -) -> KafkaProducer: # noqa - """Producer to send message to the publisher's consumer. - - """ - _, port = kafka_server - producer = KafkaProducer( - bootstrap_servers='localhost:{}'.format(port), - key_serializer=key_to_kafka, - value_serializer=key_to_kafka, - client_id=test_config['consumer_id'], - ) - return producer - - -@pytest.fixture -def consumer_from_publisher(kafka_server: Tuple[Popen, int], - test_config: Dict) -> KafkaConsumer: +def consumer( + kafka_server: Tuple[Popen, int], test_config: Dict) -> KafkaConsumer: """Get a connected Kafka consumer. """ @@ -255,16 +216,3 @@ consumer.seek_to_beginning() return consumer - - -@pytest.fixture -def publisher(kafka_server: Tuple[Popen, int], - test_config: Dict) -> JournalPublisher: - """Test Publisher factory. We cannot use a fixture here as we need to - modify the sample. - - """ - # consumer and producer of the publisher needs to discuss with the - # right instance - publisher = JournalPublisherTest(test_config) - return publisher diff --git a/swh/journal/tests/test_backfill.py b/swh/journal/tests/test_backfill.py new file mode 100644 --- /dev/null +++ b/swh/journal/tests/test_backfill.py @@ -0,0 +1,123 @@ +# Copyright (C) 2019 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import pytest + +from swh.journal.backfill import ( + JournalBackfiller, compute_query, PARTITION_KEY +) + + +TEST_CONFIG = { + 'brokers': ['localhost'], + 'final_prefix': 'swh.tmp_journal.new', + 'client_id': 'swh.journal.publisher.test', + 'storage_dbconn': 'service=swh-dev', +} + + +def test_config_ko_missing_mandatory_key(): + """Missing configuration key will make the initialization fail + + """ + for key in TEST_CONFIG.keys(): + config = TEST_CONFIG.copy() + config.pop(key) + + with pytest.raises(ValueError) as e: + JournalBackfiller(config) + + error = ('Configuration error: The following keys must be' + ' provided: %s' % (','.join([key]), )) + assert e.value.args[0] == error + + +def test_config_ko_unknown_object_type(): + """Parse arguments will fail if the object type is unknown + + """ + backfiller = JournalBackfiller(TEST_CONFIG) + with pytest.raises(ValueError) as e: + backfiller.parse_arguments('unknown-object-type', 1, 2) + + error = ('Object type unknown-object-type is not supported. ' + 'The only possible values are %s' % ( + ', '.join(PARTITION_KEY))) + assert e.value.args[0] == error + + +def test_compute_query_content(): + query, where_args, column_aliases = compute_query( + 'content', '\x000000', '\x000001') + + assert where_args == ['\x000000', '\x000001'] + + assert column_aliases == [ + 'sha1', 'sha1_git', 'sha256', 'blake2s256', 'length', 'status', + 'ctime' + ] + + assert query == ''' +select sha1,sha1_git,sha256,blake2s256,length,status,ctime +from content + +where (sha1) >= %s and (sha1) < %s + ''' + + +def test_compute_query_skipped_content(): + query, where_args, column_aliases = compute_query( + 'skipped_content', None, None) + + assert where_args == [] + + assert column_aliases == [ + 'sha1', 'sha1_git', 'sha256', 'blake2s256', 'length', 'ctime', + 'status', 'reason', + ] + + assert query == ''' +select sha1,sha1_git,sha256,blake2s256,length,ctime,status,reason +from skipped_content + + + ''' + + +def test_compute_query_origin_visit(): + query, where_args, column_aliases = compute_query( + 'origin_visit', 1, 10) + + assert where_args == [1, 10] + + assert column_aliases == [ + 'type', 'url', 'date', 'snapshot', 'status' + ] + + assert query == ''' +select type,url,date,snapshot,status +from origin_visit +left join origin on origin_visit.origin=origin.id +where (origin_visit.origin) >= %s and (origin_visit.origin) < %s + ''' + + +def test_compute_query_release(): + query, where_args, column_aliases = compute_query( + 'release', '\x000002', '\x000003') + + assert where_args == ['\x000002', '\x000003'] + + assert column_aliases == [ + 'id', 'date', 'date_offset', 'comment', 'name', 'synthetic', + 'date_neg_utc_offset', 'target', 'target_type', 'author_id', + 'author_name', 'author_email', 'author_fullname'] + + assert query == ''' +select release.id as id,date,date_offset,comment,release.name as name,synthetic,date_neg_utc_offset,target,target_type,a.id as author_id,a.name as author_name,a.email as author_email,a.fullname as author_fullname +from release +left join person a on release.author=a.id +where (release.id) >= %s and (release.id) < %s + ''' # noqa diff --git a/swh/journal/tests/test_direct_writer.py b/swh/journal/tests/test_direct_writer.py --- a/swh/journal/tests/test_direct_writer.py +++ b/swh/journal/tests/test_direct_writer.py @@ -48,7 +48,7 @@ def test_direct_writer( kafka_prefix: str, kafka_server: Tuple[Popen, int], - consumer_from_publisher: KafkaConsumer): + consumer: KafkaConsumer): kafka_prefix += '.swh.journal.objects' config = { @@ -67,13 +67,13 @@ object_ = {**object_, 'ctime': datetime.datetime.now()} writer.write_addition(object_type, object_) - assert_written(consumer_from_publisher, kafka_prefix) + assert_written(consumer, kafka_prefix) def test_storage_direct_writer( kafka_prefix: str, kafka_server: Tuple[Popen, int], - consumer_from_publisher: KafkaConsumer): + consumer: KafkaConsumer): kafka_prefix += '.swh.journal.objects' config = { @@ -102,4 +102,4 @@ else: assert False, object_type - assert_written(consumer_from_publisher, kafka_prefix) + assert_written(consumer, kafka_prefix) diff --git a/swh/journal/tests/test_publisher_kafka.py b/swh/journal/tests/test_publisher_kafka.py deleted file mode 100644 --- a/swh/journal/tests/test_publisher_kafka.py +++ /dev/null @@ -1,93 +0,0 @@ -# Copyright (C) 2018-2019 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - - -from kafka import KafkaConsumer, KafkaProducer -from subprocess import Popen -from typing import Tuple - -from swh.journal.serializers import value_to_kafka, kafka_to_value -from swh.journal.publisher import JournalPublisher - -from .conftest import OBJECT_TYPE_KEYS - - -def assert_publish_ok(publisher: JournalPublisher, - consumer_from_publisher: KafkaConsumer, - producer_to_publisher: KafkaProducer, - test_config: dict, - object_type: str): - """Assert that publishing object in the publisher is reified and - published in output topics. - - Args: - publisher (JournalPublisher): publisher to read and write data - consumer_from_publisher (KafkaConsumer): To read data from the - publisher - producer_to_publisher (KafkaProducer): To send data to the publisher - object_type (str): Object type to look for (e.g content, revision, - etc...) - - """ - # object type's id label key - object_key_id, expected_objects = OBJECT_TYPE_KEYS[object_type] - # objects to send to the publisher - if object_key_id: - objects = [{object_key_id: c[object_key_id]} - for c in expected_objects] - else: - # TODO: add support for origin and origin_visit - return - - # send message to the publisher - for obj in objects: - producer_to_publisher.send( - '%s.%s' % (test_config['temporary_prefix'], object_type), - obj - ) - - nb_messages = len(objects) - - for _ in range(nb_messages): - publisher.poll(max_messages=1) - - # then (client reads from the messages from output topic) - expected_topic = '%s.%s' % (test_config['final_prefix'], object_type) - expected_msgs = [ - ( - object_[object_key_id], - kafka_to_value(value_to_kafka(object_)) - ) - for object_ in expected_objects] - - msgs = list(consumer_from_publisher) - assert all(msg.topic == expected_topic for msg in msgs) - assert [(msg.key, msg.value) for msg in msgs] == expected_msgs - - -def test_publish( - publisher: JournalPublisher, - kafka_server: Tuple[Popen, int], - test_config: dict, - consumer_from_publisher: KafkaConsumer, - producer_to_publisher: KafkaProducer): - """ - Reading from and writing to the journal publisher should work (contents) - - Args: - journal_publisher (JournalPublisher): publisher to read and write data - consumer_from_publisher (KafkaConsumer): To read data from publisher - producer_to_publisher (KafkaProducer): To send data to publisher - - """ - # retrieve the object types we want to test - object_types = OBJECT_TYPE_KEYS.keys() - # Now for each object type, we'll send data to the publisher and - # check that data is indeed fetched and reified in the publisher's - # output topics - for object_type in object_types: - assert_publish_ok( - publisher, consumer_from_publisher, producer_to_publisher, - test_config, object_type) diff --git a/swh/journal/tests/test_publisher_no_kafka.py b/swh/journal/tests/test_publisher_no_kafka.py deleted file mode 100644 --- a/swh/journal/tests/test_publisher_no_kafka.py +++ /dev/null @@ -1,151 +0,0 @@ -# Copyright (C) 2018-2019 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -import pytest -import unittest - -from .conftest import ( - JournalPublisherTest, TEST_CONFIG, - CONTENTS, REVISIONS, RELEASES, ORIGINS -) -from swh.journal.publisher import MANDATORY_KEYS - - -class JournalPublisherNoKafkaInMemoryStorage(JournalPublisherTest): - """A journal publisher with: - - no kafka dependency - - in-memory storage - - """ - def check_config(self, config): - """No need to check the configuration here as we do not use kafka - - """ - pass - - def _prepare_journal(self, config): - """No journal for now - - """ - pass - - -class TestPublisherNoKafka(unittest.TestCase): - """This tests only the part not using any kafka instance - - """ - def setUp(self): - self.publisher = JournalPublisherNoKafkaInMemoryStorage(TEST_CONFIG) - self.contents = [{b'sha1': c['sha1']} for c in CONTENTS] - self.revisions = [{b'id': c['id']} for c in REVISIONS] - self.releases = [{b'id': c['id']} for c in RELEASES] - # those needs id generation from the storage - # so initialization is different than other entities - self.origins = [{b'url': o['url'], - b'type': o['type']} - for o in self.publisher.origins] - self.origin_visits = [{b'origin': ov['origin'], - b'visit': ov['visit']} - for ov in self.publisher.origin_visits] - # full objects - storage = self.publisher.storage - ovs = [] - for ov in self.origin_visits: - _ov = storage.origin_visit_get_by( - ov[b'origin'], ov[b'visit']) - _ov['date'] = str(_ov['date']) - ovs.append(_ov) - self.expected_origin_visits = ovs - - def test_process_contents(self): - actual_contents = self.publisher.process_contents(self.contents) - expected_contents = [(c['sha1'], c) for c in CONTENTS] - self.assertEqual(actual_contents, expected_contents) - - def test_process_revisions(self): - actual_revisions = self.publisher.process_revisions(self.revisions) - expected_revisions = [(c['id'], c) for c in REVISIONS] - self.assertEqual(actual_revisions, expected_revisions) - - def test_process_releases(self): - actual_releases = self.publisher.process_releases(self.releases) - expected_releases = [(c['id'], c) for c in RELEASES] - self.assertEqual(actual_releases, expected_releases) - - def test_process_origins(self): - actual_origins = self.publisher.process_origins(self.origins) - expected_origins = [({'url': o[b'url'], 'type': o[b'type']}, - {'url': o[b'url'], 'type': o[b'type']}) - for o in self.origins] - self.assertEqual(actual_origins, expected_origins) - - def test_process_origin_visits(self): - actual_ovs = self.publisher.process_origin_visits(self.origin_visits) - expected_ovs = [((ov['origin'], ov['visit']), ov) - for ov in self.expected_origin_visits] - self.assertEqual(actual_ovs, expected_ovs) - - def test_process_objects(self): - messages = { - 'content': self.contents, - 'revision': self.revisions, - 'release': self.releases, - 'origin': self.origins, - 'origin_visit': self.origin_visits, - } - - actual_objects = self.publisher.process_objects(messages) - - expected_contents = [(c['sha1'], c) for c in CONTENTS] - expected_revisions = [(c['id'], c) for c in REVISIONS] - expected_releases = [(c['id'], c) for c in RELEASES] - expected_origins = [(o, o) for o in ORIGINS] - expected_ovs = [((ov['origin'], ov['visit']), ov) - for ov in self.expected_origin_visits] - expected_objects = { - 'content': expected_contents, - 'revision': expected_revisions, - 'release': expected_releases, - 'origin': expected_origins, - 'origin_visit': expected_ovs, - } - - self.assertEqual(actual_objects, expected_objects) - - -class JournalPublisherCheckTest(JournalPublisherTest): - """A journal publisher with: - - no kafka dependency - - in-memory storage - - """ - def _prepare_journal(self, config): - """No journal for now - - """ - pass - - -def test_check_config_ok(test_config): - """Instantiate a publisher with the right config is fine - - """ - publisher = JournalPublisherCheckTest(test_config) - assert publisher is not None - - -def test_check_config_ko(test_config): - """Instantiate a publisher with the wrong config should raise - - """ - for k in MANDATORY_KEYS: - conf = test_config.copy() - conf.pop(k) - with pytest.raises(ValueError) as e: - JournalPublisherCheckTest(conf) - - error = ('Configuration error: The following keys must be' - ' provided: %s' % (','.join([k]), )) - assert e.value.args[0] == error