diff --git a/swh/journal/backfill.py b/swh/journal/backfill.py index 2ade3e8..ab03608 100644 --- a/swh/journal/backfill.py +++ b/swh/journal/backfill.py @@ -1,463 +1,463 @@ # Copyright (C) 2017-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """Module defining journal backfiller classes. Those backfiller goal is to produce back part or all of the objects from the storage to the journal topics At the moment, a first naive implementation is the JournalBackfiller. It simply reads the objects from the storage and sends every object identifier back to the journal. """ import logging -from .direct_writer import DirectKafkaWriter +from .writer.kafka import KafkaJournalWriter from swh.core.db import BaseDb from swh.storage.converters import db_to_release, db_to_revision logger = logging.getLogger(__name__) PARTITION_KEY = { 'content': 'sha1', 'skipped_content': None, # unused 'directory': 'id', 'revision': 'revision.id', 'release': 'release.id', 'snapshot': 'id', 'origin': 'id', 'origin_visit': 'origin_visit.origin', } COLUMNS = { 'content': [ 'sha1', 'sha1_git', 'sha256', 'blake2s256', 'length', 'status', 'ctime' ], 'skipped_content': [ 'sha1', 'sha1_git', 'sha256', 'blake2s256', 'length', 'ctime', 'status', 'reason', ], 'directory': ['id', 'dir_entries', 'file_entries', 'rev_entries'], 'revision': [ ("revision.id", "id"), "date", "date_offset", "committer_date", "committer_date_offset", "type", "directory", "message", "synthetic", "metadata", "date_neg_utc_offset", "committer_date_neg_utc_offset", ("array(select parent_id::bytea from revision_history rh " "where rh.id = revision.id order by rh.parent_rank asc)", "parents"), ("a.id", "author_id"), ("a.name", "author_name"), ("a.email", "author_email"), ("a.fullname", "author_fullname"), ("c.id", "committer_id"), ("c.name", "committer_name"), ("c.email", "committer_email"), ("c.fullname", "committer_fullname"), ], 'release': [ ("release.id", "id"), "date", "date_offset", "comment", ("release.name", "name"), "synthetic", "date_neg_utc_offset", "target", "target_type", ("a.id", "author_id"), ("a.name", "author_name"), ("a.email", "author_email"), ("a.fullname", "author_fullname"), ], 'snapshot': ['id', 'object_id'], 'origin': ['type', 'url'], 'origin_visit': ['visit', 'origin.type', 'origin_visit.type', 'url', 'date', 'snapshot', 'status', 'metadata'], } JOINS = { 'release': ['person a on release.author=a.id'], 'revision': ['person a on revision.author=a.id', 'person c on revision.committer=c.id'], 'origin_visit': ['origin on origin_visit.origin=origin.id'], } def directory_converter(db, directory): """Convert directory from the flat representation to swh model compatible objects. """ columns = ['target', 'name', 'perms'] query_template = ''' select %(columns)s from directory_entry_%(type)s where id in %%s ''' types = ['file', 'dir', 'rev'] entries = [] with db.cursor() as cur: for type in types: ids = directory.pop('%s_entries' % type) if not ids: continue query = query_template % { 'columns': ','.join(columns), 'type': type, } cur.execute(query, (tuple(ids), )) for row in cur: entry = dict(zip(columns, row)) entry['type'] = type entries.append(entry) directory['entries'] = entries return directory def revision_converter(db, revision): """Convert revision from the flat representation to swh model compatible objects. """ revision = db_to_revision(revision) if 'author' in revision and revision['author']: del revision['author']['id'] if 'committer' in revision and revision['committer']: del revision['committer']['id'] return revision def release_converter(db, release): """Convert release from the flat representation to swh model compatible objects. """ release = db_to_release(release) if 'author' in release and release['author']: del release['author']['id'] return release def snapshot_converter(db, snapshot): """Convert snapshot from the flat representation to swh model compatible objects. """ columns = ['name', 'target', 'target_type'] query = ''' select %s from snapshot_branches sbs inner join snapshot_branch sb on sb.object_id=sbs.branch_id where sbs.snapshot_id=%%s ''' % ', '.join(columns) with db.cursor() as cur: cur.execute(query, (snapshot.pop('object_id'), )) branches = {} for name, *row in cur: branch = dict(zip(columns[1:], row)) if not branch['target'] and not branch['target_type']: branch = None branches[name] = branch snapshot['branches'] = branches return snapshot def origin_visit_converter(db, origin_visit): origin = { 'type': origin_visit.pop('origin.type'), 'url': origin_visit.pop('url'), } origin_visit['origin'] = origin origin_visit['type'] = origin_visit.pop('origin_visit.type') return origin_visit CONVERTERS = { 'directory': directory_converter, 'revision': revision_converter, 'release': release_converter, 'snapshot': snapshot_converter, 'origin_visit': origin_visit_converter, } def object_to_offset(object_id, numbits): """Compute the index of the range containing object id, when dividing space into 2^numbits. Args: object_id (str): The hex representation of object_id numbits (int): Number of bits in which we divide input space Returns: The index of the range containing object id """ q, r = divmod(numbits, 8) length = q + (r != 0) shift_bits = 8 - r if r else 0 truncated_id = object_id[:length * 2] if len(truncated_id) < length * 2: truncated_id += '0' * (length * 2 - len(truncated_id)) truncated_id_bytes = bytes.fromhex(truncated_id) return int.from_bytes(truncated_id_bytes, byteorder='big') >> shift_bits def byte_ranges(numbits, start_object=None, end_object=None): """Generate start/end pairs of bytes spanning numbits bits and constrained by optional start_object and end_object. Args: numbits (int): Number of bits in which we divide input space start_object (str): Hex object id contained in the first range returned end_object (str): Hex object id contained in the last range returned Yields: 2^numbits pairs of bytes """ q, r = divmod(numbits, 8) length = q + (r != 0) shift_bits = 8 - r if r else 0 def to_bytes(i): return int.to_bytes(i << shift_bits, length=length, byteorder='big') start_offset = 0 end_offset = 1 << numbits if start_object is not None: start_offset = object_to_offset(start_object, numbits) if end_object is not None: end_offset = object_to_offset(end_object, numbits) + 1 for start in range(start_offset, end_offset): end = start + 1 if start == 0: yield None, to_bytes(end) elif end == 1 << numbits: yield to_bytes(start), None else: yield to_bytes(start), to_bytes(end) def integer_ranges(start, end, block_size=1000): for start in range(start, end, block_size): if start == 0: yield None, block_size elif start + block_size > end: yield start, end else: yield start, start + block_size RANGE_GENERATORS = { 'content': lambda start, end: byte_ranges(24, start, end), 'skipped_content': lambda start, end: [(None, None)], 'directory': lambda start, end: byte_ranges(24, start, end), 'revision': lambda start, end: byte_ranges(24, start, end), 'release': lambda start, end: byte_ranges(16, start, end), 'snapshot': lambda start, end: byte_ranges(16, start, end), 'origin': integer_ranges, 'origin_visit': integer_ranges, } def compute_query(obj_type, start, end): columns = COLUMNS.get(obj_type) join_specs = JOINS.get(obj_type, []) join_clause = '\n'.join('left join %s' % clause for clause in join_specs) where = [] where_args = [] if start: where.append('%(keys)s >= %%s') where_args.append(start) if end: where.append('%(keys)s < %%s') where_args.append(end) where_clause = '' if where: where_clause = ('where ' + ' and '.join(where)) % { 'keys': '(%s)' % PARTITION_KEY[obj_type] } column_specs = [] column_aliases = [] for column in columns: if isinstance(column, str): column_specs.append(column) column_aliases.append(column) else: column_specs.append('%s as %s' % column) column_aliases.append(column[1]) query = ''' select %(columns)s from %(table)s %(join)s %(where)s ''' % { 'columns': ','.join(column_specs), 'table': obj_type, 'join': join_clause, 'where': where_clause, } return query, where_args, column_aliases def fetch(db, obj_type, start, end): """Fetch all obj_type's identifiers from db. This opens one connection, stream objects and when done, close the connection. Args: db (BaseDb): Db connection object obj_type (str): Object type start (Union[bytes|Tuple]): Range start identifier end (Union[bytes|Tuple]): Range end identifier Raises: ValueError if obj_type is not supported Yields: Objects in the given range """ query, where_args, column_aliases = compute_query(obj_type, start, end) converter = CONVERTERS.get(obj_type) with db.cursor() as cursor: logger.debug('Fetching data for table %s', obj_type) logger.debug('query: %s %s', query, where_args) cursor.execute(query, where_args) for row in cursor: record = dict(zip(column_aliases, row)) if converter: record = converter(db, record) logger.debug('record: %s' % record) yield record def _format_range_bound(bound): if isinstance(bound, bytes): return bound.hex() else: return str(bound) MANDATORY_KEYS = ['brokers', 'storage_dbconn', 'prefix', 'client_id'] class JournalBackfiller: """Class in charge of reading the storage's objects and sends those back to the journal's topics. This is designed to be run periodically. """ def __init__(self, config=None): self.config = config self.check_config(config) def check_config(self, config): missing_keys = [] for key in MANDATORY_KEYS: if not config.get(key): missing_keys.append(key) if missing_keys: raise ValueError( 'Configuration error: The following keys must be' ' provided: %s' % (','.join(missing_keys), )) def parse_arguments(self, object_type, start_object, end_object): """Parse arguments Raises: ValueError for unsupported object type ValueError if object ids are not parseable Returns: Parsed start and end object ids """ if object_type not in COLUMNS: raise ValueError('Object type %s is not supported. ' 'The only possible values are %s' % ( object_type, ', '.join(COLUMNS.keys()))) if object_type in ['origin', 'origin_visit']: if start_object: start_object = int(start_object) else: start_object = 0 if end_object: end_object = int(end_object) else: end_object = 100 * 1000 * 1000 # hard-coded limit return start_object, end_object def run(self, object_type, start_object, end_object, dry_run=False): """Reads storage's subscribed object types and send them to the journal's reading topic. """ start_object, end_object = self.parse_arguments( object_type, start_object, end_object) db = BaseDb.connect(self.config['storage_dbconn']) - writer = DirectKafkaWriter( + writer = KafkaJournalWriter( brokers=self.config['brokers'], prefix=self.config['prefix'], client_id=self.config['client_id'] ) for range_start, range_end in RANGE_GENERATORS[object_type]( start_object, end_object): logger.info('Processing %s range %s to %s', object_type, _format_range_bound(range_start), _format_range_bound(range_end)) for obj in fetch( db, object_type, start=range_start, end=range_end, ): if dry_run: continue writer.write_addition(object_type=object_type, object_=obj) writer.producer.flush() if __name__ == '__main__': print('Please use the "swh-journal backfiller run" command') diff --git a/swh/journal/direct_writer.py b/swh/journal/direct_writer.py index 7a6aa17..2f6904c 100644 --- a/swh/journal/direct_writer.py +++ b/swh/journal/direct_writer.py @@ -1,7 +1,7 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information # for BW compat -from .writer.direct import DirectKafkaWriter # noqa +from .writer.kafka import KafkaJournalWriter as DirectKafkaWriter # noqa diff --git a/swh/journal/tests/test_direct_writer.py b/swh/journal/tests/test_kafka_writer.py similarity index 95% rename from swh/journal/tests/test_direct_writer.py rename to swh/journal/tests/test_kafka_writer.py index c06a1a6..823a892 100644 --- a/swh/journal/tests/test_direct_writer.py +++ b/swh/journal/tests/test_kafka_writer.py @@ -1,136 +1,136 @@ # Copyright (C) 2018-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import defaultdict import datetime from confluent_kafka import Consumer, KafkaException from subprocess import Popen from typing import Tuple from swh.storage import get_storage -from swh.journal.direct_writer import DirectKafkaWriter +from swh.journal.writer.kafka import KafkaJournalWriter from swh.journal.serializers import ( kafka_to_key, kafka_to_value ) from .conftest import OBJECT_TYPE_KEYS def assert_written(consumer, kafka_prefix, expected_messages): consumed_objects = defaultdict(list) fetched_messages = 0 retries_left = 1000 while fetched_messages < expected_messages: if retries_left == 0: raise ValueError('Timed out fetching messages from kafka') msg = consumer.poll(timeout=0.01) if not msg: retries_left -= 1 continue error = msg.error() if error is not None: if error.fatal(): raise KafkaException(error) retries_left -= 1 continue fetched_messages += 1 consumed_objects[msg.topic()].append( (kafka_to_key(msg.key()), kafka_to_value(msg.value())) ) for (object_type, (key_name, objects)) in OBJECT_TYPE_KEYS.items(): topic = kafka_prefix + '.' + object_type (keys, values) = zip(*consumed_objects[topic]) if key_name: assert list(keys) == [object_[key_name] for object_ in objects] else: pass # TODO if object_type == 'origin_visit': for value in values: del value['visit'] elif object_type == 'content': for value in values: del value['ctime'] for object_ in objects: assert object_ in values -def test_direct_writer( +def test_kafka_writer( kafka_prefix: str, kafka_server: Tuple[Popen, int], consumer: Consumer): kafka_prefix += '.swh.journal.objects' config = { 'brokers': 'localhost:%d' % kafka_server[1], - 'client_id': 'direct_writer', + 'client_id': 'kafka_writer', 'prefix': kafka_prefix, } - writer = DirectKafkaWriter(**config) + writer = KafkaJournalWriter(**config) expected_messages = 0 for (object_type, (_, objects)) in OBJECT_TYPE_KEYS.items(): for (num, object_) in enumerate(objects): if object_type == 'origin_visit': object_ = {**object_, 'visit': num} if object_type == 'content': object_ = {**object_, 'ctime': datetime.datetime.now()} writer.write_addition(object_type, object_) expected_messages += 1 assert_written(consumer, kafka_prefix, expected_messages) def test_storage_direct_writer( kafka_prefix: str, kafka_server: Tuple[Popen, int], consumer: Consumer): kafka_prefix += '.swh.journal.objects' config = { 'brokers': 'localhost:%d' % kafka_server[1], - 'client_id': 'direct_writer', + 'client_id': 'kafka_writer', 'prefix': kafka_prefix, } storage = get_storage('memory', {'journal_writer': { 'cls': 'kafka', 'args': config}}) expected_messages = 0 for (object_type, (_, objects)) in OBJECT_TYPE_KEYS.items(): method = getattr(storage, object_type + '_add') if object_type in ('content', 'directory', 'revision', 'release', 'snapshot', 'origin'): if object_type == 'content': objects = [{**obj, 'data': b''} for obj in objects] method(objects) expected_messages += len(objects) elif object_type in ('origin_visit',): for object_ in objects: object_ = object_.copy() origin_id = storage.origin_add_one(object_.pop('origin')) del object_['type'] visit = method(origin=origin_id, date=object_.pop('date')) expected_messages += 1 visit_id = visit['visit'] storage.origin_visit_update(origin_id, visit_id, **object_) expected_messages += 1 else: assert False, object_type assert_written(consumer, kafka_prefix, expected_messages) diff --git a/swh/journal/tests/utils.py b/swh/journal/tests/utils.py index 5e54876..4007ba6 100644 --- a/swh/journal/tests/utils.py +++ b/swh/journal/tests/utils.py @@ -1,58 +1,58 @@ from swh.journal.client import JournalClient, ACCEPTED_OBJECT_TYPES -from swh.journal.direct_writer import DirectKafkaWriter +from swh.journal.writer.kafka import KafkaJournalWriter from swh.journal.serializers import (kafka_to_value, key_to_kafka, value_to_kafka) class FakeKafkaMessage: def __init__(self, topic, key, value): self._topic = topic self._key = key_to_kafka(key) self._value = value_to_kafka(value) def topic(self): return self._topic def value(self): return self._value def key(self): return self._key def error(self): return None -class MockedKafkaWriter(DirectKafkaWriter): +class MockedKafkaWriter(KafkaJournalWriter): def __init__(self, queue): self._prefix = 'prefix' self.queue = queue def send(self, topic, key, value): msg = FakeKafkaMessage(topic=topic, key=key, value=value) self.queue.append(msg) def flush(self): pass class MockedKafkaConsumer: def __init__(self, queue): self.queue = queue self.committed = False def poll(self, timeout=None): return self.queue.pop(0) def commit(self): if self.queue == []: self.committed = True class MockedJournalClient(JournalClient): def __init__(self, queue, object_types=ACCEPTED_OBJECT_TYPES): self._object_types = object_types self.consumer = MockedKafkaConsumer(queue) self.process_timeout = 0 self.max_messages = 0 self.value_deserializer = kafka_to_value diff --git a/swh/journal/writer/__init__.py b/swh/journal/writer/__init__.py index 435c401..fb2ea5f 100644 --- a/swh/journal/writer/__init__.py +++ b/swh/journal/writer/__init__.py @@ -1,15 +1,15 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information def get_journal_writer(cls, args={}): if cls == 'inmemory': from .inmemory import InMemoryJournalWriter as JournalWriter elif cls == 'kafka': - from .direct import DirectKafkaWriter as JournalWriter + from .kafka import KafkaJournalWriter as JournalWriter else: - raise ValueError('Unknown storage class `%s`' % cls) + raise ValueError('Unknown journal writer class `%s`' % cls) return JournalWriter(**args) diff --git a/swh/journal/writer/direct.py b/swh/journal/writer/kafka.py similarity index 99% rename from swh/journal/writer/direct.py rename to swh/journal/writer/kafka.py index b9bf169..e09e181 100644 --- a/swh/journal/writer/direct.py +++ b/swh/journal/writer/kafka.py @@ -1,94 +1,94 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging from confluent_kafka import Producer from swh.model.hashutil import DEFAULT_ALGORITHMS from swh.model.model import BaseModel from swh.journal.serializers import key_to_kafka, value_to_kafka logger = logging.getLogger(__name__) -class DirectKafkaWriter: +class KafkaJournalWriter: """This class is instantiated and used by swh-storage to write incoming new objects to Kafka before adding them to the storage backend (eg. postgresql) itself.""" def __init__(self, brokers, prefix, client_id): self._prefix = prefix self.producer = Producer({ 'bootstrap.servers': brokers, 'client.id': client_id, 'enable.idempotence': 'true', }) def send(self, topic, key, value): self.producer.produce( topic=topic, key=key_to_kafka(key), value=value_to_kafka(value), ) def flush(self): self.producer.flush() # Need to service the callbacks regularly by calling poll self.producer.poll(0) def _get_key(self, object_type, object_): if object_type in ('revision', 'release', 'directory', 'snapshot'): return object_['id'] elif object_type == 'content': return object_['sha1'] # TODO: use a dict of hashes elif object_type == 'skipped_content': return { hash: object_[hash] for hash in DEFAULT_ALGORITHMS } elif object_type == 'origin': return {'url': object_['url'], 'type': object_['type']} elif object_type == 'origin_visit': return { 'origin': object_['origin'], 'date': str(object_['date']), } else: raise ValueError('Unknown object type: %s.' % object_type) def _sanitize_object(self, object_type, object_): if object_type == 'origin_visit': return { **object_, 'date': str(object_['date']), } elif object_type == 'origin': assert 'id' not in object_ return object_ def write_addition(self, object_type, object_, flush=True): """Write a single object to the journal""" if isinstance(object_, BaseModel): object_ = object_.to_dict() topic = '%s.%s' % (self._prefix, object_type) key = self._get_key(object_type, object_) object_ = self._sanitize_object(object_type, object_) logger.debug('topic: %s, key: %s, value: %s', topic, key, object_) self.send(topic, key=key, value=object_) if flush: self.flush() write_update = write_addition def write_additions(self, object_type, objects, flush=True): """Write a set of objects to the journal""" for object_ in objects: self.write_addition(object_type, object_, flush=False) if flush: self.flush()