diff --git a/mypy.ini b/mypy.ini --- a/mypy.ini +++ b/mypy.ini @@ -30,10 +30,14 @@ [mypy-tenacity.*] ignore_missing_imports = True -# temporary work-around for landing typing support in spite of the current -# journal<->storage dependency loop -[mypy-swh.journal.*] +[mypy-pytest_postgresql.*] ignore_missing_imports = True -[mypy-pytest_postgresql.*] +[mypy-confluent_kafka.*] +ignore_missing_imports = True + +[mypy-msgpack.*] +ignore_missing_imports = True + +[mypy-systemd.daemon.*] ignore_missing_imports = True diff --git a/requirements-swh-journal.txt b/requirements-swh-journal.txt --- a/requirements-swh-journal.txt +++ b/requirements-swh-journal.txt @@ -1 +1 @@ -swh.journal >= 0.0.17 +#swh.journal >= 0.0.17 diff --git a/swh/journal/__init__.py b/swh/journal/__init__.py new file mode 100644 --- /dev/null +++ b/swh/journal/__init__.py @@ -0,0 +1,7 @@ +# Copyright (C) 2019 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +# the default prefix for kafka's topics +DEFAULT_PREFIX = 'swh.journal.objects' diff --git a/swh/journal/backfill.py b/swh/journal/backfill.py new file mode 100644 --- /dev/null +++ b/swh/journal/backfill.py @@ -0,0 +1,458 @@ +# Copyright (C) 2017-2019 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +"""Module defining journal backfiller classes. + +Those backfiller goal is to produce back part or all of the objects +from the storage to the journal topics + +At the moment, a first naive implementation is the +JournalBackfiller. It simply reads the objects from the +storage and sends every object identifier back to the journal. + +""" + +import logging + +from .writer.kafka import KafkaJournalWriter + +from swh.core.db import BaseDb +from swh.storage.converters import db_to_release, db_to_revision + + +logger = logging.getLogger(__name__) + +PARTITION_KEY = { + 'content': 'sha1', + 'skipped_content': None, # unused + 'directory': 'id', + 'revision': 'revision.id', + 'release': 'release.id', + 'snapshot': 'id', + 'origin': 'id', + 'origin_visit': 'origin_visit.origin', +} + +COLUMNS = { + 'content': [ + 'sha1', 'sha1_git', 'sha256', 'blake2s256', 'length', 'status', + 'ctime' + ], + 'skipped_content': [ + 'sha1', 'sha1_git', 'sha256', 'blake2s256', 'length', 'ctime', + 'status', 'reason', + ], + 'directory': ['id', 'dir_entries', 'file_entries', 'rev_entries'], + 'revision': [ + ("revision.id", "id"), + "date", + "date_offset", + "committer_date", + "committer_date_offset", + "type", + "directory", + "message", + "synthetic", + "metadata", + "date_neg_utc_offset", + "committer_date_neg_utc_offset", + ("array(select parent_id::bytea from revision_history rh " + "where rh.id = revision.id order by rh.parent_rank asc)", + "parents"), + ("a.id", "author_id"), + ("a.name", "author_name"), + ("a.email", "author_email"), + ("a.fullname", "author_fullname"), + ("c.id", "committer_id"), + ("c.name", "committer_name"), + ("c.email", "committer_email"), + ("c.fullname", "committer_fullname"), + ], + 'release': [ + ("release.id", "id"), + "date", + "date_offset", + "comment", + ("release.name", "name"), + "synthetic", + "date_neg_utc_offset", + "target", + "target_type", + ("a.id", "author_id"), + ("a.name", "author_name"), + ("a.email", "author_email"), + ("a.fullname", "author_fullname"), + ], + 'snapshot': ['id', 'object_id'], + 'origin': ['type', 'url'], + 'origin_visit': ['visit', 'origin.type', 'origin_visit.type', + 'url', 'date', 'snapshot', 'status', 'metadata'], +} + + +JOINS = { + 'release': ['person a on release.author=a.id'], + 'revision': ['person a on revision.author=a.id', + 'person c on revision.committer=c.id'], + 'origin_visit': ['origin on origin_visit.origin=origin.id'], +} + + +def directory_converter(db, directory): + """Convert directory from the flat representation to swh model + compatible objects. + + """ + columns = ['target', 'name', 'perms'] + query_template = ''' + select %(columns)s + from directory_entry_%(type)s + where id in %%s + ''' + + types = ['file', 'dir', 'rev'] + + entries = [] + with db.cursor() as cur: + for type in types: + ids = directory.pop('%s_entries' % type) + if not ids: + continue + query = query_template % { + 'columns': ','.join(columns), + 'type': type, + } + cur.execute(query, (tuple(ids), )) + for row in cur: + entry = dict(zip(columns, row)) + entry['type'] = type + entries.append(entry) + + directory['entries'] = entries + return directory + + +def revision_converter(db, revision): + """Convert revision from the flat representation to swh model + compatible objects. + + """ + return db_to_revision(revision) + + +def release_converter(db, release): + """Convert release from the flat representation to swh model + compatible objects. + + """ + release = db_to_release(release) + if 'author' in release and release['author']: + del release['author']['id'] + return release + + +def snapshot_converter(db, snapshot): + """Convert snapshot from the flat representation to swh model + compatible objects. + + """ + columns = ['name', 'target', 'target_type'] + query = ''' + select %s + from snapshot_branches sbs + inner join snapshot_branch sb on sb.object_id=sbs.branch_id + where sbs.snapshot_id=%%s + ''' % ', '.join(columns) + with db.cursor() as cur: + cur.execute(query, (snapshot.pop('object_id'), )) + branches = {} + for name, *row in cur: + branch = dict(zip(columns[1:], row)) + if not branch['target'] and not branch['target_type']: + branch = None + branches[name] = branch + + snapshot['branches'] = branches + return snapshot + + +def origin_visit_converter(db, origin_visit): + origin = { + 'type': origin_visit.pop('origin.type'), + 'url': origin_visit.pop('url'), + } + origin_visit['origin'] = origin + origin_visit['type'] = origin_visit.pop('origin_visit.type') + return origin_visit + + +CONVERTERS = { + 'directory': directory_converter, + 'revision': revision_converter, + 'release': release_converter, + 'snapshot': snapshot_converter, + 'origin_visit': origin_visit_converter, +} + + +def object_to_offset(object_id, numbits): + """Compute the index of the range containing object id, when dividing + space into 2^numbits. + + Args: + object_id (str): The hex representation of object_id + numbits (int): Number of bits in which we divide input space + + Returns: + The index of the range containing object id + + """ + q, r = divmod(numbits, 8) + length = q + (r != 0) + shift_bits = 8 - r if r else 0 + + truncated_id = object_id[:length * 2] + if len(truncated_id) < length * 2: + truncated_id += '0' * (length * 2 - len(truncated_id)) + + truncated_id_bytes = bytes.fromhex(truncated_id) + return int.from_bytes(truncated_id_bytes, byteorder='big') >> shift_bits + + +def byte_ranges(numbits, start_object=None, end_object=None): + """Generate start/end pairs of bytes spanning numbits bits and + constrained by optional start_object and end_object. + + Args: + numbits (int): Number of bits in which we divide input space + start_object (str): Hex object id contained in the first range + returned + end_object (str): Hex object id contained in the last range + returned + + Yields: + 2^numbits pairs of bytes + + """ + q, r = divmod(numbits, 8) + length = q + (r != 0) + shift_bits = 8 - r if r else 0 + + def to_bytes(i): + return int.to_bytes(i << shift_bits, length=length, byteorder='big') + + start_offset = 0 + end_offset = 1 << numbits + + if start_object is not None: + start_offset = object_to_offset(start_object, numbits) + if end_object is not None: + end_offset = object_to_offset(end_object, numbits) + 1 + + for start in range(start_offset, end_offset): + end = start + 1 + + if start == 0: + yield None, to_bytes(end) + elif end == 1 << numbits: + yield to_bytes(start), None + else: + yield to_bytes(start), to_bytes(end) + + +def integer_ranges(start, end, block_size=1000): + for start in range(start, end, block_size): + if start == 0: + yield None, block_size + elif start + block_size > end: + yield start, end + else: + yield start, start + block_size + + +RANGE_GENERATORS = { + 'content': lambda start, end: byte_ranges(24, start, end), + 'skipped_content': lambda start, end: [(None, None)], + 'directory': lambda start, end: byte_ranges(24, start, end), + 'revision': lambda start, end: byte_ranges(24, start, end), + 'release': lambda start, end: byte_ranges(16, start, end), + 'snapshot': lambda start, end: byte_ranges(16, start, end), + 'origin': integer_ranges, + 'origin_visit': integer_ranges, +} + + +def compute_query(obj_type, start, end): + columns = COLUMNS.get(obj_type) + join_specs = JOINS.get(obj_type, []) + join_clause = '\n'.join('left join %s' % clause for clause in join_specs) + + where = [] + where_args = [] + if start: + where.append('%(keys)s >= %%s') + where_args.append(start) + if end: + where.append('%(keys)s < %%s') + where_args.append(end) + + where_clause = '' + if where: + where_clause = ('where ' + ' and '.join(where)) % { + 'keys': '(%s)' % PARTITION_KEY[obj_type] + } + + column_specs = [] + column_aliases = [] + for column in columns: + if isinstance(column, str): + column_specs.append(column) + column_aliases.append(column) + else: + column_specs.append('%s as %s' % column) + column_aliases.append(column[1]) + + query = ''' +select %(columns)s +from %(table)s +%(join)s +%(where)s + ''' % { + 'columns': ','.join(column_specs), + 'table': obj_type, + 'join': join_clause, + 'where': where_clause, + } + + return query, where_args, column_aliases + + +def fetch(db, obj_type, start, end): + """Fetch all obj_type's identifiers from db. + + This opens one connection, stream objects and when done, close + the connection. + + Args: + db (BaseDb): Db connection object + obj_type (str): Object type + start (Union[bytes|Tuple]): Range start identifier + end (Union[bytes|Tuple]): Range end identifier + + Raises: + ValueError if obj_type is not supported + + Yields: + Objects in the given range + + """ + query, where_args, column_aliases = compute_query(obj_type, start, end) + converter = CONVERTERS.get(obj_type) + with db.cursor() as cursor: + logger.debug('Fetching data for table %s', obj_type) + logger.debug('query: %s %s', query, where_args) + cursor.execute(query, where_args) + for row in cursor: + record = dict(zip(column_aliases, row)) + if converter: + record = converter(db, record) + + logger.debug('record: %s' % record) + yield record + + +def _format_range_bound(bound): + if isinstance(bound, bytes): + return bound.hex() + else: + return str(bound) + + +MANDATORY_KEYS = ['brokers', 'storage_dbconn', 'prefix', 'client_id'] + + +class JournalBackfiller: + """Class in charge of reading the storage's objects and sends those + back to the journal's topics. + + This is designed to be run periodically. + + """ + def __init__(self, config=None): + self.config = config + self.check_config(config) + + def check_config(self, config): + missing_keys = [] + for key in MANDATORY_KEYS: + if not config.get(key): + missing_keys.append(key) + + if missing_keys: + raise ValueError( + 'Configuration error: The following keys must be' + ' provided: %s' % (','.join(missing_keys), )) + + def parse_arguments(self, object_type, start_object, end_object): + """Parse arguments + + Raises: + ValueError for unsupported object type + ValueError if object ids are not parseable + + Returns: + Parsed start and end object ids + + """ + if object_type not in COLUMNS: + raise ValueError('Object type %s is not supported. ' + 'The only possible values are %s' % ( + object_type, ', '.join(COLUMNS.keys()))) + + if object_type in ['origin', 'origin_visit']: + if start_object: + start_object = int(start_object) + else: + start_object = 0 + if end_object: + end_object = int(end_object) + else: + end_object = 100 * 1000 * 1000 # hard-coded limit + + return start_object, end_object + + def run(self, object_type, start_object, end_object, dry_run=False): + """Reads storage's subscribed object types and send them to the + journal's reading topic. + + """ + start_object, end_object = self.parse_arguments( + object_type, start_object, end_object) + + db = BaseDb.connect(self.config['storage_dbconn']) + writer = KafkaJournalWriter( + brokers=self.config['brokers'], + prefix=self.config['prefix'], + client_id=self.config['client_id'] + ) + for range_start, range_end in RANGE_GENERATORS[object_type]( + start_object, end_object): + logger.info('Processing %s range %s to %s', object_type, + _format_range_bound(range_start), + _format_range_bound(range_end)) + + for obj in fetch( + db, object_type, start=range_start, end=range_end, + ): + if dry_run: + continue + writer.write_addition(object_type=object_type, + object_=obj) + + writer.producer.flush() + + +if __name__ == '__main__': + print('Please use the "swh-journal backfiller run" command') diff --git a/swh/journal/cli.py b/swh/journal/cli.py new file mode 100644 --- /dev/null +++ b/swh/journal/cli.py @@ -0,0 +1,229 @@ +# Copyright (C) 2016-2019 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import functools +import logging +import mmap +import os + +import click + +try: + from systemd.daemon import notify +except ImportError: + notify = None + +from swh.core import config +from swh.core.cli import CONTEXT_SETTINGS +from swh.model.model import SHA1_SIZE +from swh.storage import get_storage +from swh.objstorage import get_objstorage + +from swh.journal.client import JournalClient +from swh.journal.replay import is_hash_in_bytearray +from swh.journal.replay import process_replay_objects +from swh.journal.replay import process_replay_objects_content +from swh.journal.backfill import JournalBackfiller + + +@click.group(name='journal', context_settings=CONTEXT_SETTINGS) +@click.option('--config-file', '-C', default=None, + type=click.Path(exists=True, dir_okay=False,), + help="Configuration file.") +@click.pass_context +def cli(ctx, config_file): + """Software Heritage Journal tools. + + The journal is a persistent logger of changes to the archive, with + publish-subscribe support. + + """ + if not config_file: + config_file = os.environ.get('SWH_CONFIG_FILENAME') + + if config_file: + if not os.path.exists(config_file): + raise ValueError('%s does not exist' % config_file) + conf = config.read(config_file) + else: + conf = {} + + ctx.ensure_object(dict) + + ctx.obj['config'] = conf + + +def get_journal_client(ctx, **kwargs): + conf = ctx.obj['config'].get('journal', {}) + conf.update({k: v for (k, v) in kwargs.items() if v not in (None, ())}) + if not conf.get('brokers'): + ctx.fail('You must specify at least one kafka broker.') + if not isinstance(conf['brokers'], (list, tuple)): + conf['brokers'] = [conf['brokers']] + return JournalClient(**conf) + + +@cli.command() +@click.option('--stop-after-objects', '-n', default=None, type=int, + help='Stop after processing this many objects. Default is to ' + 'run forever.') +@click.pass_context +def replay(ctx, stop_after_objects): + """Fill a Storage by reading a Journal. + + There can be several 'replayers' filling a Storage as long as they use + the same `group-id`. + """ + conf = ctx.obj['config'] + try: + storage = get_storage(**conf.pop('storage')) + except KeyError: + ctx.fail('You must have a storage configured in your config file.') + + client = get_journal_client( + ctx, stop_after_objects=stop_after_objects) + worker_fn = functools.partial(process_replay_objects, storage=storage) + + if notify: + notify('READY=1') + + try: + client.process(worker_fn) + except KeyboardInterrupt: + ctx.exit(0) + else: + print('Done.') + finally: + if notify: + notify('STOPPING=1') + client.close() + + +@cli.command() +@click.argument('object_type') +@click.option('--start-object', default=None) +@click.option('--end-object', default=None) +@click.option('--dry-run', is_flag=True, default=False) +@click.pass_context +def backfiller(ctx, object_type, start_object, end_object, dry_run): + """Run the backfiller + + The backfiller list objects from a Storage and produce journal entries from + there. + + Typically used to rebuild a journal or compensate for missing objects in a + journal (eg. due to a downtime of this later). + + The configuration file requires the following entries: + - brokers: a list of kafka endpoints (the journal) in which entries will be + added. + - storage_dbconn: URL to connect to the storage DB. + - prefix: the prefix of the topics (topics will be .). + - client_id: the kafka client ID. + + """ + conf = ctx.obj['config'] + backfiller = JournalBackfiller(conf) + + if notify: + notify('READY=1') + + try: + backfiller.run( + object_type=object_type, + start_object=start_object, end_object=end_object, + dry_run=dry_run) + except KeyboardInterrupt: + if notify: + notify('STOPPING=1') + ctx.exit(0) + + +@cli.command('content-replay') +@click.option('--stop-after-objects', '-n', default=None, type=int, + help='Stop after processing this many objects. Default is to ' + 'run forever.') +@click.option('--exclude-sha1-file', default=None, type=click.File('rb'), + help='File containing a sorted array of hashes to be excluded.') +@click.option('--check-dst/--no-check-dst', default=True, + help='Check whether the destination contains the object before ' + 'copying.') +@click.pass_context +def content_replay(ctx, stop_after_objects, exclude_sha1_file, check_dst): + """Fill a destination Object Storage (typically a mirror) by reading a Journal + and retrieving objects from an existing source ObjStorage. + + There can be several 'replayers' filling a given ObjStorage as long as they + use the same `group-id`. You can use the `KAFKA_GROUP_INSTANCE_ID` + environment variable to use KIP-345 static group membership. + + This service retrieves object ids to copy from the 'content' topic. It will + only copy object's content if the object's description in the kafka + nmessage has the status:visible set. + + `--exclude-sha1-file` may be used to exclude some hashes to speed-up the + replay in case many of the contents are already in the destination + objstorage. It must contain a concatenation of all (sha1) hashes, + and it must be sorted. + This file will not be fully loaded into memory at any given time, + so it can be arbitrarily large. + + `--check-dst` sets whether the replayer should check in the destination + ObjStorage before copying an object. You can turn that off if you know + you're copying to an empty ObjStorage. + """ + conf = ctx.obj['config'] + try: + objstorage_src = get_objstorage(**conf.pop('objstorage_src')) + except KeyError: + ctx.fail('You must have a source objstorage configured in ' + 'your config file.') + try: + objstorage_dst = get_objstorage(**conf.pop('objstorage_dst')) + except KeyError: + ctx.fail('You must have a destination objstorage configured ' + 'in your config file.') + + if exclude_sha1_file: + map_ = mmap.mmap(exclude_sha1_file.fileno(), 0, prot=mmap.PROT_READ) + if map_.size() % SHA1_SIZE != 0: + ctx.fail('--exclude-sha1 must link to a file whose size is an ' + 'exact multiple of %d bytes.' % SHA1_SIZE) + nb_excluded_hashes = int(map_.size()/SHA1_SIZE) + + def exclude_fn(obj): + return is_hash_in_bytearray(obj['sha1'], map_, nb_excluded_hashes) + else: + exclude_fn = None + + client = get_journal_client( + ctx, stop_after_objects=stop_after_objects, object_types=('content',)) + worker_fn = functools.partial( + process_replay_objects_content, + src=objstorage_src, dst=objstorage_dst, exclude_fn=exclude_fn, + check_dst=check_dst) + + if notify: + notify('READY=1') + + try: + client.process(worker_fn) + except KeyboardInterrupt: + ctx.exit(0) + else: + print('Done.') + finally: + if notify: + notify('STOPPING=1') + client.close() + + +def main(): + logging.basicConfig() + return cli(auto_envvar_prefix='SWH_JOURNAL') + + +if __name__ == '__main__': + main() diff --git a/swh/journal/client.py b/swh/journal/client.py new file mode 100644 --- /dev/null +++ b/swh/journal/client.py @@ -0,0 +1,282 @@ +# Copyright (C) 2017 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from collections import defaultdict +import logging +import os +import time +from typing import Any, Dict, List, Optional, Set, Tuple, Union + +from confluent_kafka import Consumer, KafkaException, KafkaError + +from .serializers import kafka_to_value +from swh.journal import DEFAULT_PREFIX + +logger = logging.getLogger(__name__) +rdkafka_logger = logging.getLogger(__name__ + '.rdkafka') + + +# Only accepted offset reset policy accepted +ACCEPTED_OFFSET_RESET = ['earliest', 'latest'] + +# Only accepted object types +ACCEPTED_OBJECT_TYPES = [ + 'content', + 'directory', + 'revision', + 'release', + 'snapshot', + 'origin', + 'origin_visit' +] + +# Errors that Kafka raises too often and are not useful; therefore they +# we lower their log level to DEBUG instead of INFO. +_SPAMMY_ERRORS = [ + KafkaError._NO_OFFSET, +] + + +def _error_cb(error): + if error.fatal(): + raise KafkaException(error) + if error.code() in _SPAMMY_ERRORS: + logger.debug('Received non-fatal kafka error: %s', error) + else: + logger.info('Received non-fatal kafka error: %s', error) + + +def _on_commit(error, partitions): + if error is not None: + _error_cb(error) + + +class JournalClient: + """A base client for the Software Heritage journal. + + The current implementation of the journal uses Apache Kafka + brokers to publish messages under a given topic prefix, with each + object type using a specific topic under that prefix. If the `prefix` + argument is None (default value), it will take the default value + `'swh.journal.objects'`. + + Clients subscribe to events specific to each object type as listed in the + `object_types` argument (if unset, defaults to all accepted object types). + + Clients can be sharded by setting the `group_id` to a common + value across instances. The journal will share the message + throughput across the nodes sharing the same group_id. + + Messages are processed by the `worker_fn` callback passed to the `process` + method, in batches of maximum `batch_size` messages (defaults to 200). + + If set, the processing stops after processing `stop_after_objects` messages + in total. + + `stop_on_eof` stops the processing when the client has reached the end of + each partition in turn. + + `auto_offset_reset` sets the behavior of the client when the consumer group + initializes: `'earliest'` (the default) processes all objects since the + inception of the topics; `''` + + Any other named argument is passed directly to KafkaConsumer(). + + """ + def __init__( + self, + brokers: Union[str, List[str]], + group_id: str, + prefix: Optional[str] = None, + object_types: Optional[List[str]] = None, + stop_after_objects: Optional[int] = None, + batch_size: int = 200, + process_timeout: Optional[float] = None, + auto_offset_reset: str = 'earliest', + stop_on_eof: bool = False, + **kwargs + ): + if prefix is None: + prefix = DEFAULT_PREFIX + if object_types is None: + object_types = ACCEPTED_OBJECT_TYPES + if auto_offset_reset not in ACCEPTED_OFFSET_RESET: + raise ValueError( + 'Option \'auto_offset_reset\' only accept %s, not %s' % + (ACCEPTED_OFFSET_RESET, auto_offset_reset)) + + for object_type in object_types: + if object_type not in ACCEPTED_OBJECT_TYPES: + raise ValueError( + 'Option \'object_types\' only accepts %s, not %s.' % + (ACCEPTED_OBJECT_TYPES, object_type)) + + if batch_size <= 0: + raise ValueError("Option 'batch_size' needs to be positive") + + self.value_deserializer = kafka_to_value + + if isinstance(brokers, str): + brokers = [brokers] + + debug_logging = rdkafka_logger.isEnabledFor(logging.DEBUG) + if debug_logging and 'debug' not in kwargs: + kwargs['debug'] = 'consumer' + + # Static group instance id management + group_instance_id = os.environ.get('KAFKA_GROUP_INSTANCE_ID') + if group_instance_id: + kwargs['group.instance.id'] = group_instance_id + + if 'group.instance.id' in kwargs: + # When doing static consumer group membership, set a higher default + # session timeout. The session timeout is the duration after which + # the broker considers that a consumer has left the consumer group + # for good, and triggers a rebalance. Considering our current + # processing pattern, 10 minutes gives the consumer ample time to + # restart before that happens. + if 'session.timeout.ms' not in kwargs: + kwargs['session.timeout.ms'] = 10 * 60 * 1000 # 10 minutes + + if 'session.timeout.ms' in kwargs: + # When the session timeout is set, rdkafka requires the max poll + # interval to be set to a higher value; the max poll interval is + # rdkafka's way of figuring out whether the client's message + # processing thread has stalled: when the max poll interval lapses + # between two calls to consumer.poll(), rdkafka leaves the consumer + # group and terminates the connection to the brokers. + # + # We default to 1.5 times the session timeout + if 'max.poll.interval.ms' not in kwargs: + kwargs['max.poll.interval.ms'] = ( + kwargs['session.timeout.ms'] // 2 * 3 + ) + + consumer_settings = { + **kwargs, + 'bootstrap.servers': ','.join(brokers), + 'auto.offset.reset': auto_offset_reset, + 'group.id': group_id, + 'on_commit': _on_commit, + 'error_cb': _error_cb, + 'enable.auto.commit': False, + 'logger': rdkafka_logger, + } + + self.stop_on_eof = stop_on_eof + if self.stop_on_eof: + consumer_settings['enable.partition.eof'] = True + + logger.debug('Consumer settings: %s', consumer_settings) + + self.consumer = Consumer(consumer_settings) + + topics = ['%s.%s' % (prefix, object_type) + for object_type in object_types] + + logger.debug('Upstream topics: %s', + self.consumer.list_topics(timeout=10)) + logger.debug('Subscribing to: %s', topics) + + self.consumer.subscribe(topics=topics) + + self.stop_after_objects = stop_after_objects + self.process_timeout = process_timeout + self.eof_reached: Set[Tuple[str, str]] = set() + self.batch_size = batch_size + + self._object_types = object_types + + def process(self, worker_fn): + """Polls Kafka for a batch of messages, and calls the worker_fn + with these messages. + + Args: + worker_fn Callable[Dict[str, List[dict]]]: Function called with + the messages as + argument. + """ + start_time = time.monotonic() + total_objects_processed = 0 + + while True: + # timeout for message poll + timeout = 1.0 + + elapsed = time.monotonic() - start_time + if self.process_timeout: + # +0.01 to prevent busy-waiting on / spamming consumer.poll. + # consumer.consume() returns shortly before X expired + # (a matter of milliseconds), so after it returns a first + # time, it would then be called with a timeout in the order + # of milliseconds, therefore returning immediately, then be + # called again, etc. + if elapsed + 0.01 >= self.process_timeout: + break + + timeout = self.process_timeout - elapsed + + batch_size = self.batch_size + if self.stop_after_objects: + if total_objects_processed >= self.stop_after_objects: + break + + # clamp batch size to avoid overrunning stop_after_objects + batch_size = min( + self.stop_after_objects-total_objects_processed, + batch_size, + ) + + messages = self.consumer.consume( + timeout=timeout, num_messages=batch_size) + if not messages: + continue + + batch_processed, at_eof = self.handle_messages(messages, worker_fn) + total_objects_processed += batch_processed + if at_eof: + break + + return total_objects_processed + + def handle_messages(self, messages, worker_fn): + objects: Dict[str, List[Any]] = defaultdict(list) + nb_processed = 0 + + for message in messages: + error = message.error() + if error is not None: + if error.code() == KafkaError._PARTITION_EOF: + self.eof_reached.add( + (message.topic(), message.partition()) + ) + else: + _error_cb(error) + continue + + nb_processed += 1 + + object_type = message.topic().split('.')[-1] + # Got a message from a topic we did not subscribe to. + assert object_type in self._object_types, object_type + + objects[object_type].append(self.deserialize_message(message)) + + if objects: + worker_fn(dict(objects)) + self.consumer.commit() + + at_eof = (self.stop_on_eof and all( + (tp.topic, tp.partition) in self.eof_reached + for tp in self.consumer.assignment() + )) + + return nb_processed, at_eof + + def deserialize_message(self, message): + return self.value_deserializer(message.value()) + + def close(self): + self.consumer.close() diff --git a/swh/journal/direct_writer.py b/swh/journal/direct_writer.py new file mode 100644 --- /dev/null +++ b/swh/journal/direct_writer.py @@ -0,0 +1,7 @@ +# Copyright (C) 2019 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +# for BW compat +from .writer.kafka import KafkaJournalWriter as DirectKafkaWriter # noqa diff --git a/swh/journal/fixer.py b/swh/journal/fixer.py new file mode 100644 --- /dev/null +++ b/swh/journal/fixer.py @@ -0,0 +1,268 @@ +import copy +import logging +from typing import Any, Dict, List, Optional +from swh.model.identifiers import normalize_timestamp + +logger = logging.getLogger(__name__) + + +def _fix_content(content: Dict[str, Any]) -> Dict[str, Any]: + """Filters-out invalid 'perms' key that leaked from swh.model.from_disk + to the journal. + + >>> _fix_content({'perms': 0o100644, 'sha1_git': b'foo'}) + {'sha1_git': b'foo'} + + >>> _fix_content({'sha1_git': b'bar'}) + {'sha1_git': b'bar'} + + """ + content = content.copy() + content.pop('perms', None) + return content + + +def _fix_revision_pypi_empty_string(rev): + """PyPI loader failed to encode empty strings as bytes, see: + swh:1:rev:8f0095ee0664867055d03de9bcc8f95b91d8a2b9 + or https://forge.softwareheritage.org/D1772 + """ + rev = { + **rev, + 'author': rev['author'].copy(), + 'committer': rev['committer'].copy(), + } + if rev['author'].get('email') == '': + rev['author']['email'] = b'' + if rev['author'].get('name') == '': + rev['author']['name'] = b'' + if rev['committer'].get('email') == '': + rev['committer']['email'] = b'' + if rev['committer'].get('name') == '': + rev['committer']['name'] = b'' + return rev + + +def _fix_revision_transplant_source(rev): + if rev.get('metadata') and rev['metadata'].get('extra_headers'): + rev = copy.deepcopy(rev) + rev['metadata']['extra_headers'] = [ + [key, value.encode('ascii')] + if key == 'transplant_source' and isinstance(value, str) + else [key, value] + for (key, value) in rev['metadata']['extra_headers']] + return rev + + +def _check_date(date): + """Returns whether the date can be represented in backends with sane + limits on timestamps and timezones (resp. signed 64-bits and + signed 16 bits), and that microseconds is valid (ie. between 0 and 10^6). + """ + if date is None: + return True + date = normalize_timestamp(date) + return (-2**63 <= date['timestamp']['seconds'] < 2**63) \ + and (0 <= date['timestamp']['microseconds'] < 10**6) \ + and (-2**15 <= date['offset'] < 2**15) + + +def _check_revision_date(rev): + """Exclude revisions with invalid dates. + See https://forge.softwareheritage.org/T1339""" + return _check_date(rev['date']) and _check_date(rev['committer_date']) + + +def _fix_revision(revision: Dict[str, Any]) -> Optional[Dict]: + """Fix various legacy revision issues. + + Fix author/committer person: + + >>> from pprint import pprint + >>> date = { + ... 'timestamp': { + ... 'seconds': 1565096932, + ... 'microseconds': 0, + ... }, + ... 'offset': 0, + ... } + >>> rev0 = _fix_revision({ + ... 'id': b'rev-id', + ... 'author': {'fullname': b'', 'name': '', 'email': ''}, + ... 'committer': {'fullname': b'', 'name': '', 'email': ''}, + ... 'date': date, + ... 'committer_date': date, + ... 'type': 'git', + ... 'message': '', + ... 'directory': b'dir-id', + ... 'synthetic': False, + ... }) + >>> rev0['author'] + {'fullname': b'', 'name': b'', 'email': b''} + >>> rev0['committer'] + {'fullname': b'', 'name': b'', 'email': b''} + + Fix type of 'transplant_source' extra headers: + + >>> rev1 = _fix_revision({ + ... 'id': b'rev-id', + ... 'author': {'fullname': b'', 'name': '', 'email': ''}, + ... 'committer': {'fullname': b'', 'name': '', 'email': ''}, + ... 'date': date, + ... 'committer_date': date, + ... 'metadata': { + ... 'extra_headers': [ + ... ['time_offset_seconds', b'-3600'], + ... ['transplant_source', '29c154a012a70f49df983625090434587622b39e'] + ... ]}, + ... 'type': 'git', + ... 'message': '', + ... 'directory': b'dir-id', + ... 'synthetic': False, + ... }) + >>> pprint(rev1['metadata']['extra_headers']) + [['time_offset_seconds', b'-3600'], + ['transplant_source', b'29c154a012a70f49df983625090434587622b39e']] + + Revision with invalid date are filtered: + + >>> from copy import deepcopy + >>> invalid_date1 = deepcopy(date) + >>> invalid_date1['timestamp']['microseconds'] = 1000000000 # > 10^6 + >>> rev = _fix_revision({ + ... 'author': {'fullname': b'', 'name': '', 'email': ''}, + ... 'committer': {'fullname': b'', 'name': '', 'email': ''}, + ... 'date': invalid_date1, + ... 'committer_date': date, + ... }) + >>> rev is None + True + + >>> invalid_date2 = deepcopy(date) + >>> invalid_date2['timestamp']['seconds'] = 2**70 # > 10^63 + >>> rev = _fix_revision({ + ... 'author': {'fullname': b'', 'name': '', 'email': ''}, + ... 'committer': {'fullname': b'', 'name': '', 'email': ''}, + ... 'date': invalid_date2, + ... 'committer_date': date, + ... }) + >>> rev is None + True + + >>> invalid_date3 = deepcopy(date) + >>> invalid_date3['offset'] = 2**20 # > 10^15 + >>> rev = _fix_revision({ + ... 'author': {'fullname': b'', 'name': '', 'email': ''}, + ... 'committer': {'fullname': b'', 'name': '', 'email': ''}, + ... 'date': date, + ... 'committer_date': invalid_date3, + ... }) + >>> rev is None + True + + """ # noqa + rev = _fix_revision_pypi_empty_string(revision) + rev = _fix_revision_transplant_source(rev) + if not _check_revision_date(rev): + logger.warning('Invalid revision date detected: %(revision)s', { + 'revision': rev + }) + return None + return rev + + +def _fix_origin_visit(visit: Dict) -> Dict: + """Fix various legacy origin visit issues. + + `visit['origin']` is a dict instead of an URL: + + >>> from datetime import datetime, timezone + >>> from pprint import pprint + >>> date = datetime(2020, 2, 27, 14, 39, 19, tzinfo=timezone.utc) + >>> pprint(_fix_origin_visit({ + ... 'origin': {'url': 'http://foo'}, + ... 'date': date, + ... 'type': 'git', + ... 'status': 'ongoing', + ... 'snapshot': None, + ... })) + {'date': datetime.datetime(2020, 2, 27, 14, 39, 19, tzinfo=datetime.timezone.utc), + 'metadata': None, + 'origin': 'http://foo', + 'snapshot': None, + 'status': 'ongoing', + 'type': 'git'} + + `visit['type']` is missing , but `origin['visit']['type']` exists: + + >>> pprint(_fix_origin_visit( + ... {'origin': {'type': 'hg', 'url': 'http://foo'}, + ... 'date': date, + ... 'status': 'ongoing', + ... 'snapshot': None, + ... })) + {'date': datetime.datetime(2020, 2, 27, 14, 39, 19, tzinfo=datetime.timezone.utc), + 'metadata': None, + 'origin': 'http://foo', + 'snapshot': None, + 'status': 'ongoing', + 'type': 'hg'} + + Old visit format (origin_visit with no type) raises: + + >>> _fix_origin_visit({ + ... 'origin': {'url': 'http://foo'}, + ... 'date': date, + ... 'status': 'ongoing', + ... 'snapshot': None + ... }) + Traceback (most recent call last): + ... + ValueError: Old origin visit format detected... + + >>> _fix_origin_visit({ + ... 'origin': 'http://foo', + ... 'date': date, + ... 'status': 'ongoing', + ... 'snapshot': None + ... }) + Traceback (most recent call last): + ... + ValueError: Old origin visit format detected... + + """ # noqa + visit = visit.copy() + if 'type' not in visit: + if isinstance(visit['origin'], dict) and 'type' in visit['origin']: + # Very old version of the schema: visits did not have a type, + # but their 'origin' field was a dict with a 'type' key. + visit['type'] = visit['origin']['type'] + else: + # Very old schema version: 'type' is missing, stop early + + # We expect the journal's origin_visit topic to no longer reference + # such visits. If it does, the replayer must crash so we can fix + # the journal's topic. + raise ValueError(f'Old origin visit format detected: {visit}') + if isinstance(visit['origin'], dict): + # Old version of the schema: visit['origin'] was a dict. + visit['origin'] = visit['origin']['url'] + if 'metadata' not in visit: + visit['metadata'] = None + return visit + + +def fix_objects(object_type: str, objects: List[Dict]) -> List[Dict]: + """ + Fix legacy objects from the journal to bring them up to date with the + latest storage schema. + """ + if object_type == 'content': + return [_fix_content(v) for v in objects] + elif object_type == 'revision': + revisions = [_fix_revision(v) for v in objects] + return [rev for rev in revisions if rev is not None] + elif object_type == 'origin_visit': + return [_fix_origin_visit(v) for v in objects] + else: + return objects diff --git a/swh/journal/py.typed b/swh/journal/py.typed new file mode 100644 --- /dev/null +++ b/swh/journal/py.typed @@ -0,0 +1 @@ +# Marker file for PEP 561. diff --git a/swh/journal/replay.py b/swh/journal/replay.py new file mode 100644 --- /dev/null +++ b/swh/journal/replay.py @@ -0,0 +1,393 @@ +# Copyright (C) 2019-2020 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import logging +from time import time +from typing import ( + Any, Callable, Dict, Iterable, List, Optional +) + +from sentry_sdk import capture_exception, push_scope +try: + from systemd.daemon import notify +except ImportError: + notify = None + +from tenacity import ( + retry, retry_if_exception_type, stop_after_attempt, + wait_random_exponential, +) + +from swh.core.statsd import statsd +from swh.journal.fixer import fix_objects +from swh.model.hashutil import hash_to_hex + +from swh.model.model import ( + BaseContent, BaseModel, Content, Directory, Origin, OriginVisit, Revision, + SHA1_SIZE, SkippedContent, Snapshot, Release +) +from swh.objstorage.objstorage import ( + ID_HASH_ALGO, ObjNotFoundError, ObjStorage, +) +from swh.storage.exc import HashCollision + +logger = logging.getLogger(__name__) + +GRAPH_OPERATIONS_METRIC = "swh_graph_replayer_operations_total" +GRAPH_DURATION_METRIC = "swh_graph_replayer_duration_seconds" +CONTENT_OPERATIONS_METRIC = "swh_content_replayer_operations_total" +CONTENT_RETRY_METRIC = "swh_content_replayer_retries_total" +CONTENT_BYTES_METRIC = "swh_content_replayer_bytes" +CONTENT_DURATION_METRIC = "swh_content_replayer_duration_seconds" + + +object_converter_fn: Dict[str, Callable[[Dict], BaseModel]] = { + 'origin': Origin.from_dict, + 'origin_visit': OriginVisit.from_dict, + 'snapshot': Snapshot.from_dict, + 'revision': Revision.from_dict, + 'release': Release.from_dict, + 'directory': Directory.from_dict, + 'content': Content.from_dict, + 'skipped_content': SkippedContent.from_dict, +} + + +def process_replay_objects(all_objects, *, storage): + for (object_type, objects) in all_objects.items(): + logger.debug("Inserting %s %s objects", len(objects), object_type) + with statsd.timed(GRAPH_DURATION_METRIC, + tags={'object_type': object_type}): + _insert_objects(object_type, objects, storage) + statsd.increment(GRAPH_OPERATIONS_METRIC, len(objects), + tags={'object_type': object_type}) + if notify: + notify('WATCHDOG=1') + + +def collision_aware_content_add( + content_add_fn: Callable[[Iterable[Any]], None], + contents: List[BaseContent]) -> None: + """Add contents to storage. If a hash collision is detected, an error is + logged. Then this adds the other non colliding contents to the storage. + + Args: + content_add_fn: Storage content callable + contents: List of contents or skipped contents to add to storage + + """ + if not contents: + return + colliding_content_hashes: List[Dict[str, Any]] = [] + while True: + try: + content_add_fn(contents) + except HashCollision as e: + colliding_content_hashes.append({ + 'algo': e.algo, + 'hash': e.hash_id, # hex hash id + 'objects': e.colliding_contents # hex hashes + }) + colliding_hashes = e.colliding_content_hashes() + # Drop the colliding contents from the transaction + contents = [c for c in contents + if c.hashes() not in colliding_hashes] + else: + # Successfully added contents, we are done + break + if colliding_content_hashes: + for collision in colliding_content_hashes: + logger.error('Collision detected: %(collision)s', { + 'collision': collision + }) + + +def _insert_objects(object_type: str, objects: List[Dict], storage) -> None: + """Insert objects of type object_type in the storage. + + """ + objects = fix_objects(object_type, objects) + + if object_type == 'content': + contents: List[BaseContent] = [] + skipped_contents: List[BaseContent] = [] + for content in objects: + c = BaseContent.from_dict(content) + if isinstance(c, SkippedContent): + skipped_contents.append(c) + else: + contents.append(c) + + collision_aware_content_add( + storage.skipped_content_add, skipped_contents) + collision_aware_content_add( + storage.content_add_metadata, contents) + elif object_type == 'origin_visit': + visits: List[OriginVisit] = [] + origins: List[Origin] = [] + for obj in objects: + visit = OriginVisit.from_dict(obj) + visits.append(visit) + origins.append(Origin(url=visit.origin)) + storage.origin_add(origins) + storage.origin_visit_upsert(visits) + elif object_type in ( + 'directory', 'revision', 'release', 'snapshot', 'origin' + ): + method = getattr(storage, object_type + '_add') + method(object_converter_fn[object_type](o) for o in objects) + else: + logger.warning('Received a series of %s, this should not happen', + object_type) + + +def is_hash_in_bytearray(hash_, array, nb_hashes, hash_size=SHA1_SIZE): + """ + Checks if the given hash is in the provided `array`. The array must be + a *sorted* list of sha1 hashes, and contain `nb_hashes` hashes + (so its size must by `nb_hashes*hash_size` bytes). + + Args: + hash_ (bytes): the hash to look for + array (bytes): a sorted concatenated array of hashes (may be of + any type supporting slice indexing, eg. :class:`mmap.mmap`) + nb_hashes (int): number of hashes in the array + hash_size (int): size of a hash (defaults to 20, for SHA1) + + Example: + + >>> import os + >>> hash1 = os.urandom(20) + >>> hash2 = os.urandom(20) + >>> hash3 = os.urandom(20) + >>> array = b''.join(sorted([hash1, hash2])) + >>> is_hash_in_bytearray(hash1, array, 2) + True + >>> is_hash_in_bytearray(hash2, array, 2) + True + >>> is_hash_in_bytearray(hash3, array, 2) + False + """ + if len(hash_) != hash_size: + raise ValueError('hash_ does not match the provided hash_size.') + + def get_hash(position): + return array[position*hash_size:(position+1)*hash_size] + + # Regular dichotomy: + left = 0 + right = nb_hashes + while left < right-1: + middle = int((right+left)/2) + pivot = get_hash(middle) + if pivot == hash_: + return True + elif pivot < hash_: + left = middle + else: + right = middle + return get_hash(left) == hash_ + + +class ReplayError(Exception): + """An error occurred during the replay of an object""" + def __init__(self, operation, *, obj_id, exc): + self.operation = operation + self.obj_id = hash_to_hex(obj_id) + self.exc = exc + + def __str__(self): + return "ReplayError(doing %s, %s, %s)" % ( + self.operation, self.obj_id, self.exc + ) + + +def log_replay_retry(retry_obj, sleep, last_result): + """Log a retry of the content replayer""" + exc = last_result.exception() + logger.debug('Retry operation %(operation)s on %(obj_id)s: %(exc)s', + {'operation': exc.operation, 'obj_id': exc.obj_id, + 'exc': str(exc.exc)}) + + statsd.increment(CONTENT_RETRY_METRIC, tags={ + 'operation': exc.operation, + 'attempt': str(retry_obj.statistics['attempt_number']), + }) + + +def log_replay_error(last_attempt): + """Log a replay error to sentry""" + exc = last_attempt.exception() + with push_scope() as scope: + scope.set_tag('operation', exc.operation) + scope.set_extra('obj_id', exc.obj_id) + capture_exception(exc.exc) + + logger.error( + 'Failed operation %(operation)s on %(obj_id)s after %(retries)s' + ' retries: %(exc)s', { + 'obj_id': exc.obj_id, 'operation': exc.operation, + 'exc': str(exc.exc), 'retries': last_attempt.attempt_number, + }) + + return None + + +CONTENT_REPLAY_RETRIES = 3 + +content_replay_retry = retry( + retry=retry_if_exception_type(ReplayError), + stop=stop_after_attempt(CONTENT_REPLAY_RETRIES), + wait=wait_random_exponential(multiplier=1, max=60), + before_sleep=log_replay_retry, + retry_error_callback=log_replay_error, +) + + +@content_replay_retry +def copy_object(obj_id, src, dst): + hex_obj_id = hash_to_hex(obj_id) + obj = '' + try: + with statsd.timed(CONTENT_DURATION_METRIC, tags={'request': 'get'}): + obj = src.get(obj_id) + logger.debug('retrieved %(obj_id)s', {'obj_id': hex_obj_id}) + + with statsd.timed(CONTENT_DURATION_METRIC, tags={'request': 'put'}): + dst.add(obj, obj_id=obj_id, check_presence=False) + logger.debug('copied %(obj_id)s', {'obj_id': hex_obj_id}) + statsd.increment(CONTENT_BYTES_METRIC, len(obj)) + except ObjNotFoundError: + logger.error('Failed to copy %(obj_id)s: object not found', + {'obj_id': hex_obj_id}) + raise + except Exception as exc: + raise ReplayError('copy', obj_id=obj_id, exc=exc) from None + return len(obj) + + +@content_replay_retry +def obj_in_objstorage(obj_id, dst): + """Check if an object is already in an objstorage, tenaciously""" + try: + return obj_id in dst + except Exception as exc: + raise ReplayError('in_dst', obj_id=obj_id, exc=exc) from None + + +def process_replay_objects_content( + all_objects: Dict[str, List[dict]], + *, + src: ObjStorage, + dst: ObjStorage, + exclude_fn: Optional[Callable[[dict], bool]] = None, + check_dst: bool = True, +): + """ + Takes a list of records from Kafka (see + :py:func:`swh.journal.client.JournalClient.process`) and copies them + from the `src` objstorage to the `dst` objstorage, if: + + * `obj['status']` is `'visible'` + * `exclude_fn(obj)` is `False` (if `exclude_fn` is provided) + * `obj['sha1'] not in dst` (if `check_dst` is True) + + Args: + all_objects: Objects passed by the Kafka client. Most importantly, + `all_objects['content'][*]['sha1']` is the sha1 hash of each + content. + src: An object storage (see :py:func:`swh.objstorage.get_objstorage`) + dst: An object storage (see :py:func:`swh.objstorage.get_objstorage`) + exclude_fn: Determines whether an object should be copied. + check_dst: Determines whether we should check the destination + objstorage before copying. + + Example: + + >>> from swh.objstorage import get_objstorage + >>> src = get_objstorage('memory', {}) + >>> dst = get_objstorage('memory', {}) + >>> id1 = src.add(b'foo bar') + >>> id2 = src.add(b'baz qux') + >>> kafka_partitions = { + ... 'content': [ + ... { + ... 'sha1': id1, + ... 'status': 'visible', + ... }, + ... { + ... 'sha1': id2, + ... 'status': 'visible', + ... }, + ... ] + ... } + >>> process_replay_objects_content( + ... kafka_partitions, src=src, dst=dst, + ... exclude_fn=lambda obj: obj['sha1'] == id1) + >>> id1 in dst + False + >>> id2 in dst + True + """ + vol = [] + nb_skipped = 0 + nb_failures = 0 + t0 = time() + + for (object_type, objects) in all_objects.items(): + if object_type != 'content': + logger.warning( + 'Received a series of %s, this should not happen', + object_type) + continue + for obj in objects: + obj_id = obj[ID_HASH_ALGO] + if obj['status'] != 'visible': + nb_skipped += 1 + logger.debug('skipped %s (status=%s)', + hash_to_hex(obj_id), obj['status']) + statsd.increment(CONTENT_OPERATIONS_METRIC, + tags={"decision": "skipped", + "status": obj["status"]}) + elif exclude_fn and exclude_fn(obj): + nb_skipped += 1 + logger.debug('skipped %s (manually excluded)', + hash_to_hex(obj_id)) + statsd.increment(CONTENT_OPERATIONS_METRIC, + tags={"decision": "excluded"}) + elif check_dst and obj_in_objstorage(obj_id, dst): + nb_skipped += 1 + logger.debug('skipped %s (in dst)', hash_to_hex(obj_id)) + statsd.increment(CONTENT_OPERATIONS_METRIC, + tags={"decision": "in_dst"}) + else: + try: + copied = copy_object(obj_id, src, dst) + except ObjNotFoundError: + nb_skipped += 1 + statsd.increment(CONTENT_OPERATIONS_METRIC, + tags={"decision": "not_in_src"}) + else: + if copied is None: + nb_failures += 1 + statsd.increment(CONTENT_OPERATIONS_METRIC, + tags={"decision": "failed"}) + else: + vol.append(copied) + statsd.increment(CONTENT_OPERATIONS_METRIC, + tags={"decision": "copied"}) + + dt = time() - t0 + logger.info( + 'processed %s content objects in %.1fsec ' + '(%.1f obj/sec, %.1fMB/sec) - %d failed - %d skipped', + len(vol), dt, + len(vol)/dt, + sum(vol)/1024/1024/dt, + nb_failures, + nb_skipped) + + if notify: + notify('WATCHDOG=1') diff --git a/swh/journal/serializers.py b/swh/journal/serializers.py new file mode 100644 --- /dev/null +++ b/swh/journal/serializers.py @@ -0,0 +1,32 @@ +# Copyright (C) 2016-2017 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import msgpack + +from swh.core.api.serializers import msgpack_dumps, msgpack_loads + + +def key_to_kafka(key): + """Serialize a key, possibly a dict, in a predictable way""" + p = msgpack.Packer(use_bin_type=True) + if isinstance(key, dict): + return p.pack_map_pairs(sorted(key.items())) + else: + return p.pack(key) + + +def kafka_to_key(kafka_key): + """Deserialize a key""" + return msgpack.loads(kafka_key) + + +def value_to_kafka(value): + """Serialize some data for storage in kafka""" + return msgpack_dumps(value) + + +def kafka_to_value(kafka_value): + """Deserialize some data stored in kafka""" + return msgpack_loads(kafka_value) diff --git a/swh/journal/writer/__init__.py b/swh/journal/writer/__init__.py new file mode 100644 --- /dev/null +++ b/swh/journal/writer/__init__.py @@ -0,0 +1,27 @@ +# Copyright (C) 2019 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import warnings + + +def get_journal_writer(cls, **kwargs): + if 'args' in kwargs: + warnings.warn( + 'Explicit "args" key is deprecated, use keys directly instead.', + DeprecationWarning) + kwargs = kwargs['args'] + + if cls == 'inmemory': # FIXME: Remove inmemory in due time + warnings.warn("cls = 'inmemory' is deprecated, use 'memory' instead", + DeprecationWarning) + cls = 'memory' + if cls == 'memory': + from .inmemory import InMemoryJournalWriter as JournalWriter + elif cls == 'kafka': + from .kafka import KafkaJournalWriter as JournalWriter + else: + raise ValueError('Unknown journal writer class `%s`' % cls) + + return JournalWriter(**kwargs) diff --git a/swh/journal/writer/inmemory.py b/swh/journal/writer/inmemory.py new file mode 100644 --- /dev/null +++ b/swh/journal/writer/inmemory.py @@ -0,0 +1,28 @@ +# Copyright (C) 2019 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import logging +import copy + +from swh.model.model import BaseModel + +logger = logging.getLogger(__name__) + + +class InMemoryJournalWriter: + def __init__(self): + # Share the list of objects across processes, for RemoteAPI tests. + self.objects = [] + + def write_addition(self, object_type, object_): + if isinstance(object_, BaseModel): + object_ = object_.to_dict() + self.objects.append((object_type, copy.deepcopy(object_))) + + write_update = write_addition + + def write_additions(self, object_type, objects): + for object_ in objects: + self.write_addition(object_type, object_) diff --git a/swh/journal/writer/kafka.py b/swh/journal/writer/kafka.py new file mode 100644 --- /dev/null +++ b/swh/journal/writer/kafka.py @@ -0,0 +1,115 @@ +# Copyright (C) 2019 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import logging + +from confluent_kafka import Producer, KafkaException + +from swh.model.hashutil import DEFAULT_ALGORITHMS +from swh.model.model import BaseModel + +from swh.journal.serializers import key_to_kafka, value_to_kafka + +logger = logging.getLogger(__name__) + + +class KafkaJournalWriter: + """This class is instantiated and used by swh-storage to write incoming + new objects to Kafka before adding them to the storage backend + (eg. postgresql) itself.""" + def __init__(self, brokers, prefix, client_id, producer_config=None): + self._prefix = prefix + + if isinstance(brokers, str): + brokers = [brokers] + + if not producer_config: + producer_config = {} + + self.producer = Producer({ + 'bootstrap.servers': ','.join(brokers), + 'client.id': client_id, + 'on_delivery': self._on_delivery, + 'error_cb': self._error_cb, + 'logger': logger, + 'acks': 'all', + **producer_config, + }) + + def _error_cb(self, error): + if error.fatal(): + raise KafkaException(error) + logger.info('Received non-fatal kafka error: %s', error) + + def _on_delivery(self, error, message): + if error is not None: + self._error_cb(error) + + def send(self, topic, key, value): + self.producer.produce( + topic=topic, + key=key_to_kafka(key), + value=value_to_kafka(value), + ) + + # Need to service the callbacks regularly by calling poll + self.producer.poll(0) + + def flush(self): + self.producer.flush() + + def _get_key(self, object_type, object_): + if object_type in ('revision', 'release', 'directory', 'snapshot'): + return object_['id'] + elif object_type == 'content': + return object_['sha1'] # TODO: use a dict of hashes + elif object_type == 'skipped_content': + return { + hash: object_[hash] + for hash in DEFAULT_ALGORITHMS + } + elif object_type == 'origin': + return {'url': object_['url']} + elif object_type == 'origin_visit': + return { + 'origin': object_['origin'], + 'date': str(object_['date']), + } + else: + raise ValueError('Unknown object type: %s.' % object_type) + + def _sanitize_object(self, object_type, object_): + if object_type == 'origin_visit': + return { + **object_, + 'date': str(object_['date']), + } + elif object_type == 'origin': + assert 'id' not in object_ + return object_ + + def _write_addition(self, object_type, object_): + """Write a single object to the journal""" + if isinstance(object_, BaseModel): + object_ = object_.to_dict() + topic = '%s.%s' % (self._prefix, object_type) + key = self._get_key(object_type, object_) + dict_ = self._sanitize_object(object_type, object_) + logger.debug('topic: %s, key: %s, value: %s', topic, key, dict_) + self.send(topic, key=key, value=dict_) + + def write_addition(self, object_type, object_): + """Write a single object to the journal""" + self._write_addition(object_type, object_) + self.flush() + + write_update = write_addition + + def write_additions(self, object_type, objects): + """Write a set of objects to the journal""" + for object_ in objects: + self._write_addition(object_type, object_) + + self.flush()