diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index f811500..f44dbd8 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,59 +1,53 @@ repos: - repo: https://github.com/pre-commit/pre-commit-hooks rev: v2.4.0 hooks: - id: trailing-whitespace - id: check-json - id: check-yaml # we need the master of pyflakes to have support for @overload on methods, # so we use a local config for now # - repo: https://gitlab.com/pycqa/flake8 # hooks: # - id: flake8 - repo: local hooks: - id: flake8 name: flake8 entry: flake8 pass_filenames: true language: system types: [python] - repo: https://github.com/codespell-project/codespell rev: v1.16.0 hooks: - id: codespell - repo: local hooks: - id: mypy name: mypy entry: mypy args: [swh] pass_filenames: false language: system types: [python] +- repo: https://github.com/python/black + rev: 19.10b0 + hooks: + - id: black + # unfortunately, we are far from being able to enable this... # - repo: https://github.com/PyCQA/pydocstyle.git # rev: 4.0.0 # hooks: # - id: pydocstyle # name: pydocstyle # description: pydocstyle is a static analysis tool for checking compliance with Python docstring conventions. # entry: pydocstyle --convention=google # language: python # types: [python] -# black requires py3.6+ -#- repo: https://github.com/python/black -# rev: 19.3b0 -# hooks: -# - id: black -# language_version: python3 -#- repo: https://github.com/asottile/blacken-docs -# rev: v1.0.0-1 -# hooks: -# - id: blacken-docs -# additional_dependencies: [black==19.3b0] diff --git a/setup.py b/setup.py index c743ab7..78e49ef 100755 --- a/setup.py +++ b/setup.py @@ -1,71 +1,71 @@ #!/usr/bin/env python3 # Copyright (C) 2015-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from setuptools import setup, find_packages from os import path from io import open here = path.abspath(path.dirname(__file__)) # Get the long description from the README file -with open(path.join(here, 'README.md'), encoding='utf-8') as f: +with open(path.join(here, "README.md"), encoding="utf-8") as f: long_description = f.read() def parse_requirements(name=None): if name: - reqf = 'requirements-%s.txt' % name + reqf = "requirements-%s.txt" % name else: - reqf = 'requirements.txt' + reqf = "requirements.txt" requirements = [] if not path.exists(reqf): return requirements with open(reqf) as f: for line in f.readlines(): line = line.strip() - if not line or line.startswith('#'): + if not line or line.startswith("#"): continue requirements.append(line) return requirements setup( - name='swh.journal', - description='Software Heritage Journal utilities', + name="swh.journal", + description="Software Heritage Journal utilities", long_description=long_description, - long_description_content_type='text/markdown', - author='Software Heritage developers', - author_email='swh-devel@inria.fr', - url='https://forge.softwareheritage.org/diffusion/DJNL/', + long_description_content_type="text/markdown", + author="Software Heritage developers", + author_email="swh-devel@inria.fr", + url="https://forge.softwareheritage.org/diffusion/DJNL/", packages=find_packages(), scripts=[], - entry_points=''' + entry_points=""" [console_scripts] swh-journal=swh.journal.cli:main [swh.cli.subcommands] journal=swh.journal.cli:cli - ''', - install_requires=parse_requirements() + parse_requirements('swh'), - setup_requires=['vcversioner'], - extras_require={'testing': parse_requirements('test')}, + """, + install_requires=parse_requirements() + parse_requirements("swh"), + setup_requires=["vcversioner"], + extras_require={"testing": parse_requirements("test")}, vcversioner={}, include_package_data=True, classifiers=[ "Programming Language :: Python :: 3", "Intended Audience :: Developers", "License :: OSI Approved :: GNU General Public License v3 (GPLv3)", "Operating System :: OS Independent", "Development Status :: 5 - Production/Stable", ], project_urls={ - 'Bug Reports': 'https://forge.softwareheritage.org/maniphest', - 'Funding': 'https://www.softwareheritage.org/donate', - 'Source': 'https://forge.softwareheritage.org/source/swh-journal', + "Bug Reports": "https://forge.softwareheritage.org/maniphest", + "Funding": "https://www.softwareheritage.org/donate", + "Source": "https://forge.softwareheritage.org/source/swh-journal", }, ) diff --git a/swh/journal/__init__.py b/swh/journal/__init__.py index 3dd2c0c..3fcff67 100644 --- a/swh/journal/__init__.py +++ b/swh/journal/__init__.py @@ -1,7 +1,7 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information # the default prefix for kafka's topics -DEFAULT_PREFIX = 'swh.journal.objects' +DEFAULT_PREFIX = "swh.journal.objects" diff --git a/swh/journal/backfill.py b/swh/journal/backfill.py index a00654c..902b897 100644 --- a/swh/journal/backfill.py +++ b/swh/journal/backfill.py @@ -1,458 +1,489 @@ # Copyright (C) 2017-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """Module defining journal backfiller classes. Those backfiller goal is to produce back part or all of the objects from the storage to the journal topics At the moment, a first naive implementation is the JournalBackfiller. It simply reads the objects from the storage and sends every object identifier back to the journal. """ import logging from .writer.kafka import KafkaJournalWriter from swh.core.db import BaseDb from swh.storage.converters import db_to_release, db_to_revision logger = logging.getLogger(__name__) PARTITION_KEY = { - 'content': 'sha1', - 'skipped_content': None, # unused - 'directory': 'id', - 'revision': 'revision.id', - 'release': 'release.id', - 'snapshot': 'id', - 'origin': 'id', - 'origin_visit': 'origin_visit.origin', + "content": "sha1", + "skipped_content": None, # unused + "directory": "id", + "revision": "revision.id", + "release": "release.id", + "snapshot": "id", + "origin": "id", + "origin_visit": "origin_visit.origin", } COLUMNS = { - 'content': [ - 'sha1', 'sha1_git', 'sha256', 'blake2s256', 'length', 'status', - 'ctime' + "content": [ + "sha1", + "sha1_git", + "sha256", + "blake2s256", + "length", + "status", + "ctime", ], - 'skipped_content': [ - 'sha1', 'sha1_git', 'sha256', 'blake2s256', 'length', 'ctime', - 'status', 'reason', + "skipped_content": [ + "sha1", + "sha1_git", + "sha256", + "blake2s256", + "length", + "ctime", + "status", + "reason", ], - 'directory': ['id', 'dir_entries', 'file_entries', 'rev_entries'], - 'revision': [ + "directory": ["id", "dir_entries", "file_entries", "rev_entries"], + "revision": [ ("revision.id", "id"), "date", "date_offset", "committer_date", "committer_date_offset", "type", "directory", "message", "synthetic", "metadata", "date_neg_utc_offset", "committer_date_neg_utc_offset", - ("array(select parent_id::bytea from revision_history rh " - "where rh.id = revision.id order by rh.parent_rank asc)", - "parents"), + ( + "array(select parent_id::bytea from revision_history rh " + "where rh.id = revision.id order by rh.parent_rank asc)", + "parents", + ), ("a.id", "author_id"), ("a.name", "author_name"), ("a.email", "author_email"), ("a.fullname", "author_fullname"), ("c.id", "committer_id"), ("c.name", "committer_name"), ("c.email", "committer_email"), ("c.fullname", "committer_fullname"), ], - 'release': [ + "release": [ ("release.id", "id"), "date", "date_offset", "comment", ("release.name", "name"), "synthetic", "date_neg_utc_offset", "target", "target_type", ("a.id", "author_id"), ("a.name", "author_name"), ("a.email", "author_email"), ("a.fullname", "author_fullname"), ], - 'snapshot': ['id', 'object_id'], - 'origin': ['type', 'url'], - 'origin_visit': ['visit', 'origin.type', 'origin_visit.type', - 'url', 'date', 'snapshot', 'status', 'metadata'], + "snapshot": ["id", "object_id"], + "origin": ["type", "url"], + "origin_visit": [ + "visit", + "origin.type", + "origin_visit.type", + "url", + "date", + "snapshot", + "status", + "metadata", + ], } JOINS = { - 'release': ['person a on release.author=a.id'], - 'revision': ['person a on revision.author=a.id', - 'person c on revision.committer=c.id'], - 'origin_visit': ['origin on origin_visit.origin=origin.id'], + "release": ["person a on release.author=a.id"], + "revision": [ + "person a on revision.author=a.id", + "person c on revision.committer=c.id", + ], + "origin_visit": ["origin on origin_visit.origin=origin.id"], } def directory_converter(db, directory): """Convert directory from the flat representation to swh model compatible objects. """ - columns = ['target', 'name', 'perms'] - query_template = ''' + columns = ["target", "name", "perms"] + query_template = """ select %(columns)s from directory_entry_%(type)s where id in %%s - ''' + """ - types = ['file', 'dir', 'rev'] + types = ["file", "dir", "rev"] entries = [] with db.cursor() as cur: for type in types: - ids = directory.pop('%s_entries' % type) + ids = directory.pop("%s_entries" % type) if not ids: continue query = query_template % { - 'columns': ','.join(columns), - 'type': type, + "columns": ",".join(columns), + "type": type, } - cur.execute(query, (tuple(ids), )) + cur.execute(query, (tuple(ids),)) for row in cur: entry = dict(zip(columns, row)) - entry['type'] = type + entry["type"] = type entries.append(entry) - directory['entries'] = entries + directory["entries"] = entries return directory def revision_converter(db, revision): """Convert revision from the flat representation to swh model compatible objects. """ return db_to_revision(revision) def release_converter(db, release): """Convert release from the flat representation to swh model compatible objects. """ release = db_to_release(release) - if 'author' in release and release['author']: - del release['author']['id'] + if "author" in release and release["author"]: + del release["author"]["id"] return release def snapshot_converter(db, snapshot): """Convert snapshot from the flat representation to swh model compatible objects. """ - columns = ['name', 'target', 'target_type'] - query = ''' + columns = ["name", "target", "target_type"] + query = """ select %s from snapshot_branches sbs inner join snapshot_branch sb on sb.object_id=sbs.branch_id where sbs.snapshot_id=%%s - ''' % ', '.join(columns) + """ % ", ".join( + columns + ) with db.cursor() as cur: - cur.execute(query, (snapshot.pop('object_id'), )) + cur.execute(query, (snapshot.pop("object_id"),)) branches = {} for name, *row in cur: branch = dict(zip(columns[1:], row)) - if not branch['target'] and not branch['target_type']: + if not branch["target"] and not branch["target_type"]: branch = None branches[name] = branch - snapshot['branches'] = branches + snapshot["branches"] = branches return snapshot def origin_visit_converter(db, origin_visit): origin = { - 'type': origin_visit.pop('origin.type'), - 'url': origin_visit.pop('url'), + "type": origin_visit.pop("origin.type"), + "url": origin_visit.pop("url"), } - origin_visit['origin'] = origin - origin_visit['type'] = origin_visit.pop('origin_visit.type') + origin_visit["origin"] = origin + origin_visit["type"] = origin_visit.pop("origin_visit.type") return origin_visit CONVERTERS = { - 'directory': directory_converter, - 'revision': revision_converter, - 'release': release_converter, - 'snapshot': snapshot_converter, - 'origin_visit': origin_visit_converter, + "directory": directory_converter, + "revision": revision_converter, + "release": release_converter, + "snapshot": snapshot_converter, + "origin_visit": origin_visit_converter, } def object_to_offset(object_id, numbits): """Compute the index of the range containing object id, when dividing space into 2^numbits. Args: object_id (str): The hex representation of object_id numbits (int): Number of bits in which we divide input space Returns: The index of the range containing object id """ q, r = divmod(numbits, 8) length = q + (r != 0) shift_bits = 8 - r if r else 0 - truncated_id = object_id[:length * 2] + truncated_id = object_id[: length * 2] if len(truncated_id) < length * 2: - truncated_id += '0' * (length * 2 - len(truncated_id)) + truncated_id += "0" * (length * 2 - len(truncated_id)) truncated_id_bytes = bytes.fromhex(truncated_id) - return int.from_bytes(truncated_id_bytes, byteorder='big') >> shift_bits + return int.from_bytes(truncated_id_bytes, byteorder="big") >> shift_bits def byte_ranges(numbits, start_object=None, end_object=None): """Generate start/end pairs of bytes spanning numbits bits and constrained by optional start_object and end_object. Args: numbits (int): Number of bits in which we divide input space start_object (str): Hex object id contained in the first range returned end_object (str): Hex object id contained in the last range returned Yields: 2^numbits pairs of bytes """ q, r = divmod(numbits, 8) length = q + (r != 0) shift_bits = 8 - r if r else 0 def to_bytes(i): - return int.to_bytes(i << shift_bits, length=length, byteorder='big') + return int.to_bytes(i << shift_bits, length=length, byteorder="big") start_offset = 0 end_offset = 1 << numbits if start_object is not None: start_offset = object_to_offset(start_object, numbits) if end_object is not None: end_offset = object_to_offset(end_object, numbits) + 1 for start in range(start_offset, end_offset): end = start + 1 if start == 0: yield None, to_bytes(end) elif end == 1 << numbits: yield to_bytes(start), None else: yield to_bytes(start), to_bytes(end) def integer_ranges(start, end, block_size=1000): for start in range(start, end, block_size): if start == 0: yield None, block_size elif start + block_size > end: yield start, end else: yield start, start + block_size RANGE_GENERATORS = { - 'content': lambda start, end: byte_ranges(24, start, end), - 'skipped_content': lambda start, end: [(None, None)], - 'directory': lambda start, end: byte_ranges(24, start, end), - 'revision': lambda start, end: byte_ranges(24, start, end), - 'release': lambda start, end: byte_ranges(16, start, end), - 'snapshot': lambda start, end: byte_ranges(16, start, end), - 'origin': integer_ranges, - 'origin_visit': integer_ranges, + "content": lambda start, end: byte_ranges(24, start, end), + "skipped_content": lambda start, end: [(None, None)], + "directory": lambda start, end: byte_ranges(24, start, end), + "revision": lambda start, end: byte_ranges(24, start, end), + "release": lambda start, end: byte_ranges(16, start, end), + "snapshot": lambda start, end: byte_ranges(16, start, end), + "origin": integer_ranges, + "origin_visit": integer_ranges, } def compute_query(obj_type, start, end): columns = COLUMNS.get(obj_type) join_specs = JOINS.get(obj_type, []) - join_clause = '\n'.join('left join %s' % clause for clause in join_specs) + join_clause = "\n".join("left join %s" % clause for clause in join_specs) where = [] where_args = [] if start: - where.append('%(keys)s >= %%s') + where.append("%(keys)s >= %%s") where_args.append(start) if end: - where.append('%(keys)s < %%s') + where.append("%(keys)s < %%s") where_args.append(end) - where_clause = '' + where_clause = "" if where: - where_clause = ('where ' + ' and '.join(where)) % { - 'keys': '(%s)' % PARTITION_KEY[obj_type] + where_clause = ("where " + " and ".join(where)) % { + "keys": "(%s)" % PARTITION_KEY[obj_type] } column_specs = [] column_aliases = [] for column in columns: if isinstance(column, str): column_specs.append(column) column_aliases.append(column) else: - column_specs.append('%s as %s' % column) + column_specs.append("%s as %s" % column) column_aliases.append(column[1]) - query = ''' + query = """ select %(columns)s from %(table)s %(join)s %(where)s - ''' % { - 'columns': ','.join(column_specs), - 'table': obj_type, - 'join': join_clause, - 'where': where_clause, + """ % { + "columns": ",".join(column_specs), + "table": obj_type, + "join": join_clause, + "where": where_clause, } return query, where_args, column_aliases def fetch(db, obj_type, start, end): """Fetch all obj_type's identifiers from db. This opens one connection, stream objects and when done, close the connection. Args: db (BaseDb): Db connection object obj_type (str): Object type start (Union[bytes|Tuple]): Range start identifier end (Union[bytes|Tuple]): Range end identifier Raises: ValueError if obj_type is not supported Yields: Objects in the given range """ query, where_args, column_aliases = compute_query(obj_type, start, end) converter = CONVERTERS.get(obj_type) with db.cursor() as cursor: - logger.debug('Fetching data for table %s', obj_type) - logger.debug('query: %s %s', query, where_args) + logger.debug("Fetching data for table %s", obj_type) + logger.debug("query: %s %s", query, where_args) cursor.execute(query, where_args) for row in cursor: record = dict(zip(column_aliases, row)) if converter: record = converter(db, record) - logger.debug('record: %s' % record) + logger.debug("record: %s" % record) yield record def _format_range_bound(bound): if isinstance(bound, bytes): return bound.hex() else: return str(bound) -MANDATORY_KEYS = ['brokers', 'storage_dbconn', 'prefix', 'client_id'] +MANDATORY_KEYS = ["brokers", "storage_dbconn", "prefix", "client_id"] class JournalBackfiller: """Class in charge of reading the storage's objects and sends those back to the journal's topics. This is designed to be run periodically. """ + def __init__(self, config=None): self.config = config self.check_config(config) def check_config(self, config): missing_keys = [] for key in MANDATORY_KEYS: if not config.get(key): missing_keys.append(key) if missing_keys: raise ValueError( - 'Configuration error: The following keys must be' - ' provided: %s' % (','.join(missing_keys), )) + "Configuration error: The following keys must be" + " provided: %s" % (",".join(missing_keys),) + ) def parse_arguments(self, object_type, start_object, end_object): """Parse arguments Raises: ValueError for unsupported object type ValueError if object ids are not parseable Returns: Parsed start and end object ids """ if object_type not in COLUMNS: - raise ValueError('Object type %s is not supported. ' - 'The only possible values are %s' % ( - object_type, ', '.join(COLUMNS.keys()))) + raise ValueError( + "Object type %s is not supported. " + "The only possible values are %s" + % (object_type, ", ".join(COLUMNS.keys())) + ) - if object_type in ['origin', 'origin_visit']: + if object_type in ["origin", "origin_visit"]: if start_object: start_object = int(start_object) else: start_object = 0 if end_object: end_object = int(end_object) else: end_object = 100 * 1000 * 1000 # hard-coded limit return start_object, end_object def run(self, object_type, start_object, end_object, dry_run=False): """Reads storage's subscribed object types and send them to the journal's reading topic. """ start_object, end_object = self.parse_arguments( - object_type, start_object, end_object) + object_type, start_object, end_object + ) - db = BaseDb.connect(self.config['storage_dbconn']) + db = BaseDb.connect(self.config["storage_dbconn"]) writer = KafkaJournalWriter( - brokers=self.config['brokers'], - prefix=self.config['prefix'], - client_id=self.config['client_id'] + brokers=self.config["brokers"], + prefix=self.config["prefix"], + client_id=self.config["client_id"], ) for range_start, range_end in RANGE_GENERATORS[object_type]( - start_object, end_object): - logger.info('Processing %s range %s to %s', object_type, - _format_range_bound(range_start), - _format_range_bound(range_end)) - - for obj in fetch( - db, object_type, start=range_start, end=range_end, - ): + start_object, end_object + ): + logger.info( + "Processing %s range %s to %s", + object_type, + _format_range_bound(range_start), + _format_range_bound(range_end), + ) + + for obj in fetch(db, object_type, start=range_start, end=range_end,): if dry_run: continue - writer.write_addition(object_type=object_type, - object_=obj) + writer.write_addition(object_type=object_type, object_=obj) writer.producer.flush() -if __name__ == '__main__': +if __name__ == "__main__": print('Please use the "swh-journal backfiller run" command') diff --git a/swh/journal/cli.py b/swh/journal/cli.py index 21a97ea..8928137 100644 --- a/swh/journal/cli.py +++ b/swh/journal/cli.py @@ -1,229 +1,255 @@ # Copyright (C) 2016-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import functools import logging import mmap import os import click try: from systemd.daemon import notify except ImportError: notify = None from swh.core import config from swh.core.cli import CONTEXT_SETTINGS from swh.model.model import SHA1_SIZE from swh.storage import get_storage from swh.objstorage import get_objstorage from swh.journal.client import JournalClient from swh.journal.replay import is_hash_in_bytearray from swh.journal.replay import process_replay_objects from swh.journal.replay import process_replay_objects_content from swh.journal.backfill import JournalBackfiller -@click.group(name='journal', context_settings=CONTEXT_SETTINGS) -@click.option('--config-file', '-C', default=None, - type=click.Path(exists=True, dir_okay=False,), - help="Configuration file.") +@click.group(name="journal", context_settings=CONTEXT_SETTINGS) +@click.option( + "--config-file", + "-C", + default=None, + type=click.Path(exists=True, dir_okay=False,), + help="Configuration file.", +) @click.pass_context def cli(ctx, config_file): """Software Heritage Journal tools. The journal is a persistent logger of changes to the archive, with publish-subscribe support. """ if not config_file: - config_file = os.environ.get('SWH_CONFIG_FILENAME') + config_file = os.environ.get("SWH_CONFIG_FILENAME") if config_file: if not os.path.exists(config_file): - raise ValueError('%s does not exist' % config_file) + raise ValueError("%s does not exist" % config_file) conf = config.read(config_file) else: conf = {} ctx.ensure_object(dict) - ctx.obj['config'] = conf + ctx.obj["config"] = conf def get_journal_client(ctx, **kwargs): - conf = ctx.obj['config'].get('journal', {}) + conf = ctx.obj["config"].get("journal", {}) conf.update({k: v for (k, v) in kwargs.items() if v not in (None, ())}) - if not conf.get('brokers'): - ctx.fail('You must specify at least one kafka broker.') - if not isinstance(conf['brokers'], (list, tuple)): - conf['brokers'] = [conf['brokers']] + if not conf.get("brokers"): + ctx.fail("You must specify at least one kafka broker.") + if not isinstance(conf["brokers"], (list, tuple)): + conf["brokers"] = [conf["brokers"]] return JournalClient(**conf) @cli.command() -@click.option('--stop-after-objects', '-n', default=None, type=int, - help='Stop after processing this many objects. Default is to ' - 'run forever.') +@click.option( + "--stop-after-objects", + "-n", + default=None, + type=int, + help="Stop after processing this many objects. Default is to " "run forever.", +) @click.pass_context def replay(ctx, stop_after_objects): """Fill a Storage by reading a Journal. There can be several 'replayers' filling a Storage as long as they use the same `group-id`. """ - conf = ctx.obj['config'] + conf = ctx.obj["config"] try: - storage = get_storage(**conf.pop('storage')) + storage = get_storage(**conf.pop("storage")) except KeyError: - ctx.fail('You must have a storage configured in your config file.') + ctx.fail("You must have a storage configured in your config file.") - client = get_journal_client( - ctx, stop_after_objects=stop_after_objects) + client = get_journal_client(ctx, stop_after_objects=stop_after_objects) worker_fn = functools.partial(process_replay_objects, storage=storage) if notify: - notify('READY=1') + notify("READY=1") try: client.process(worker_fn) except KeyboardInterrupt: ctx.exit(0) else: - print('Done.') + print("Done.") finally: if notify: - notify('STOPPING=1') + notify("STOPPING=1") client.close() @cli.command() -@click.argument('object_type') -@click.option('--start-object', default=None) -@click.option('--end-object', default=None) -@click.option('--dry-run', is_flag=True, default=False) +@click.argument("object_type") +@click.option("--start-object", default=None) +@click.option("--end-object", default=None) +@click.option("--dry-run", is_flag=True, default=False) @click.pass_context def backfiller(ctx, object_type, start_object, end_object, dry_run): """Run the backfiller The backfiller list objects from a Storage and produce journal entries from there. Typically used to rebuild a journal or compensate for missing objects in a journal (eg. due to a downtime of this later). The configuration file requires the following entries: - brokers: a list of kafka endpoints (the journal) in which entries will be added. - storage_dbconn: URL to connect to the storage DB. - prefix: the prefix of the topics (topics will be .). - client_id: the kafka client ID. """ - conf = ctx.obj['config'] + conf = ctx.obj["config"] backfiller = JournalBackfiller(conf) if notify: - notify('READY=1') + notify("READY=1") try: backfiller.run( object_type=object_type, - start_object=start_object, end_object=end_object, - dry_run=dry_run) + start_object=start_object, + end_object=end_object, + dry_run=dry_run, + ) except KeyboardInterrupt: if notify: - notify('STOPPING=1') + notify("STOPPING=1") ctx.exit(0) -@cli.command('content-replay') -@click.option('--stop-after-objects', '-n', default=None, type=int, - help='Stop after processing this many objects. Default is to ' - 'run forever.') -@click.option('--exclude-sha1-file', default=None, type=click.File('rb'), - help='File containing a sorted array of hashes to be excluded.') -@click.option('--check-dst/--no-check-dst', default=True, - help='Check whether the destination contains the object before ' - 'copying.') +@cli.command("content-replay") +@click.option( + "--stop-after-objects", + "-n", + default=None, + type=int, + help="Stop after processing this many objects. Default is to " "run forever.", +) +@click.option( + "--exclude-sha1-file", + default=None, + type=click.File("rb"), + help="File containing a sorted array of hashes to be excluded.", +) +@click.option( + "--check-dst/--no-check-dst", + default=True, + help="Check whether the destination contains the object before " "copying.", +) @click.pass_context def content_replay(ctx, stop_after_objects, exclude_sha1_file, check_dst): """Fill a destination Object Storage (typically a mirror) by reading a Journal and retrieving objects from an existing source ObjStorage. There can be several 'replayers' filling a given ObjStorage as long as they use the same `group-id`. You can use the `KAFKA_GROUP_INSTANCE_ID` environment variable to use KIP-345 static group membership. This service retrieves object ids to copy from the 'content' topic. It will only copy object's content if the object's description in the kafka nmessage has the status:visible set. `--exclude-sha1-file` may be used to exclude some hashes to speed-up the replay in case many of the contents are already in the destination objstorage. It must contain a concatenation of all (sha1) hashes, and it must be sorted. This file will not be fully loaded into memory at any given time, so it can be arbitrarily large. `--check-dst` sets whether the replayer should check in the destination ObjStorage before copying an object. You can turn that off if you know you're copying to an empty ObjStorage. """ - conf = ctx.obj['config'] + conf = ctx.obj["config"] try: - objstorage_src = get_objstorage(**conf.pop('objstorage_src')) + objstorage_src = get_objstorage(**conf.pop("objstorage_src")) except KeyError: - ctx.fail('You must have a source objstorage configured in ' - 'your config file.') + ctx.fail("You must have a source objstorage configured in " "your config file.") try: - objstorage_dst = get_objstorage(**conf.pop('objstorage_dst')) + objstorage_dst = get_objstorage(**conf.pop("objstorage_dst")) except KeyError: - ctx.fail('You must have a destination objstorage configured ' - 'in your config file.') + ctx.fail( + "You must have a destination objstorage configured " "in your config file." + ) if exclude_sha1_file: map_ = mmap.mmap(exclude_sha1_file.fileno(), 0, prot=mmap.PROT_READ) if map_.size() % SHA1_SIZE != 0: - ctx.fail('--exclude-sha1 must link to a file whose size is an ' - 'exact multiple of %d bytes.' % SHA1_SIZE) + ctx.fail( + "--exclude-sha1 must link to a file whose size is an " + "exact multiple of %d bytes." % SHA1_SIZE + ) nb_excluded_hashes = int(map_.size() / SHA1_SIZE) def exclude_fn(obj): - return is_hash_in_bytearray(obj['sha1'], map_, nb_excluded_hashes) + return is_hash_in_bytearray(obj["sha1"], map_, nb_excluded_hashes) + else: exclude_fn = None client = get_journal_client( - ctx, stop_after_objects=stop_after_objects, object_types=('content',)) + ctx, stop_after_objects=stop_after_objects, object_types=("content",) + ) worker_fn = functools.partial( process_replay_objects_content, - src=objstorage_src, dst=objstorage_dst, exclude_fn=exclude_fn, - check_dst=check_dst) + src=objstorage_src, + dst=objstorage_dst, + exclude_fn=exclude_fn, + check_dst=check_dst, + ) if notify: - notify('READY=1') + notify("READY=1") try: client.process(worker_fn) except KeyboardInterrupt: ctx.exit(0) else: - print('Done.') + print("Done.") finally: if notify: - notify('STOPPING=1') + notify("STOPPING=1") client.close() def main(): logging.basicConfig() - return cli(auto_envvar_prefix='SWH_JOURNAL') + return cli(auto_envvar_prefix="SWH_JOURNAL") -if __name__ == '__main__': +if __name__ == "__main__": main() diff --git a/swh/journal/client.py b/swh/journal/client.py index a401cea..8f336ac 100644 --- a/swh/journal/client.py +++ b/swh/journal/client.py @@ -1,282 +1,277 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import defaultdict import logging import os import time from typing import Any, Dict, List, Optional, Set, Tuple, Union from confluent_kafka import Consumer, KafkaException, KafkaError from .serializers import kafka_to_value from swh.journal import DEFAULT_PREFIX logger = logging.getLogger(__name__) -rdkafka_logger = logging.getLogger(__name__ + '.rdkafka') +rdkafka_logger = logging.getLogger(__name__ + ".rdkafka") # Only accepted offset reset policy accepted -ACCEPTED_OFFSET_RESET = ['earliest', 'latest'] +ACCEPTED_OFFSET_RESET = ["earliest", "latest"] # Only accepted object types ACCEPTED_OBJECT_TYPES = [ - 'content', - 'directory', - 'revision', - 'release', - 'snapshot', - 'origin', - 'origin_visit' + "content", + "directory", + "revision", + "release", + "snapshot", + "origin", + "origin_visit", ] # Errors that Kafka raises too often and are not useful; therefore they # we lower their log level to DEBUG instead of INFO. _SPAMMY_ERRORS = [ KafkaError._NO_OFFSET, ] def _error_cb(error): if error.fatal(): raise KafkaException(error) if error.code() in _SPAMMY_ERRORS: - logger.debug('Received non-fatal kafka error: %s', error) + logger.debug("Received non-fatal kafka error: %s", error) else: - logger.info('Received non-fatal kafka error: %s', error) + logger.info("Received non-fatal kafka error: %s", error) def _on_commit(error, partitions): if error is not None: _error_cb(error) class JournalClient: """A base client for the Software Heritage journal. The current implementation of the journal uses Apache Kafka brokers to publish messages under a given topic prefix, with each object type using a specific topic under that prefix. If the `prefix` argument is None (default value), it will take the default value `'swh.journal.objects'`. Clients subscribe to events specific to each object type as listed in the `object_types` argument (if unset, defaults to all accepted object types). Clients can be sharded by setting the `group_id` to a common value across instances. The journal will share the message throughput across the nodes sharing the same group_id. Messages are processed by the `worker_fn` callback passed to the `process` method, in batches of maximum `batch_size` messages (defaults to 200). If set, the processing stops after processing `stop_after_objects` messages in total. `stop_on_eof` stops the processing when the client has reached the end of each partition in turn. `auto_offset_reset` sets the behavior of the client when the consumer group initializes: `'earliest'` (the default) processes all objects since the inception of the topics; `''` Any other named argument is passed directly to KafkaConsumer(). """ + def __init__( - self, - brokers: Union[str, List[str]], - group_id: str, - prefix: Optional[str] = None, - object_types: Optional[List[str]] = None, - stop_after_objects: Optional[int] = None, - batch_size: int = 200, - process_timeout: Optional[float] = None, - auto_offset_reset: str = 'earliest', - stop_on_eof: bool = False, - **kwargs + self, + brokers: Union[str, List[str]], + group_id: str, + prefix: Optional[str] = None, + object_types: Optional[List[str]] = None, + stop_after_objects: Optional[int] = None, + batch_size: int = 200, + process_timeout: Optional[float] = None, + auto_offset_reset: str = "earliest", + stop_on_eof: bool = False, + **kwargs ): if prefix is None: prefix = DEFAULT_PREFIX if object_types is None: object_types = ACCEPTED_OBJECT_TYPES if auto_offset_reset not in ACCEPTED_OFFSET_RESET: raise ValueError( - 'Option \'auto_offset_reset\' only accept %s, not %s' % - (ACCEPTED_OFFSET_RESET, auto_offset_reset)) + "Option 'auto_offset_reset' only accept %s, not %s" + % (ACCEPTED_OFFSET_RESET, auto_offset_reset) + ) for object_type in object_types: if object_type not in ACCEPTED_OBJECT_TYPES: raise ValueError( - 'Option \'object_types\' only accepts %s, not %s.' % - (ACCEPTED_OBJECT_TYPES, object_type)) + "Option 'object_types' only accepts %s, not %s." + % (ACCEPTED_OBJECT_TYPES, object_type) + ) if batch_size <= 0: raise ValueError("Option 'batch_size' needs to be positive") self.value_deserializer = kafka_to_value if isinstance(brokers, str): brokers = [brokers] debug_logging = rdkafka_logger.isEnabledFor(logging.DEBUG) - if debug_logging and 'debug' not in kwargs: - kwargs['debug'] = 'consumer' + if debug_logging and "debug" not in kwargs: + kwargs["debug"] = "consumer" # Static group instance id management - group_instance_id = os.environ.get('KAFKA_GROUP_INSTANCE_ID') + group_instance_id = os.environ.get("KAFKA_GROUP_INSTANCE_ID") if group_instance_id: - kwargs['group.instance.id'] = group_instance_id + kwargs["group.instance.id"] = group_instance_id - if 'group.instance.id' in kwargs: + if "group.instance.id" in kwargs: # When doing static consumer group membership, set a higher default # session timeout. The session timeout is the duration after which # the broker considers that a consumer has left the consumer group # for good, and triggers a rebalance. Considering our current # processing pattern, 10 minutes gives the consumer ample time to # restart before that happens. - if 'session.timeout.ms' not in kwargs: - kwargs['session.timeout.ms'] = 10 * 60 * 1000 # 10 minutes + if "session.timeout.ms" not in kwargs: + kwargs["session.timeout.ms"] = 10 * 60 * 1000 # 10 minutes - if 'session.timeout.ms' in kwargs: + if "session.timeout.ms" in kwargs: # When the session timeout is set, rdkafka requires the max poll # interval to be set to a higher value; the max poll interval is # rdkafka's way of figuring out whether the client's message # processing thread has stalled: when the max poll interval lapses # between two calls to consumer.poll(), rdkafka leaves the consumer # group and terminates the connection to the brokers. # # We default to 1.5 times the session timeout - if 'max.poll.interval.ms' not in kwargs: - kwargs['max.poll.interval.ms'] = ( - kwargs['session.timeout.ms'] // 2 * 3 - ) + if "max.poll.interval.ms" not in kwargs: + kwargs["max.poll.interval.ms"] = kwargs["session.timeout.ms"] // 2 * 3 consumer_settings = { **kwargs, - 'bootstrap.servers': ','.join(brokers), - 'auto.offset.reset': auto_offset_reset, - 'group.id': group_id, - 'on_commit': _on_commit, - 'error_cb': _error_cb, - 'enable.auto.commit': False, - 'logger': rdkafka_logger, + "bootstrap.servers": ",".join(brokers), + "auto.offset.reset": auto_offset_reset, + "group.id": group_id, + "on_commit": _on_commit, + "error_cb": _error_cb, + "enable.auto.commit": False, + "logger": rdkafka_logger, } self.stop_on_eof = stop_on_eof if self.stop_on_eof: - consumer_settings['enable.partition.eof'] = True + consumer_settings["enable.partition.eof"] = True - logger.debug('Consumer settings: %s', consumer_settings) + logger.debug("Consumer settings: %s", consumer_settings) self.consumer = Consumer(consumer_settings) - topics = ['%s.%s' % (prefix, object_type) - for object_type in object_types] + topics = ["%s.%s" % (prefix, object_type) for object_type in object_types] - logger.debug('Upstream topics: %s', - self.consumer.list_topics(timeout=10)) - logger.debug('Subscribing to: %s', topics) + logger.debug("Upstream topics: %s", self.consumer.list_topics(timeout=10)) + logger.debug("Subscribing to: %s", topics) self.consumer.subscribe(topics=topics) self.stop_after_objects = stop_after_objects self.process_timeout = process_timeout self.eof_reached: Set[Tuple[str, str]] = set() self.batch_size = batch_size self._object_types = object_types def process(self, worker_fn): """Polls Kafka for a batch of messages, and calls the worker_fn with these messages. Args: worker_fn Callable[Dict[str, List[dict]]]: Function called with the messages as argument. """ start_time = time.monotonic() total_objects_processed = 0 while True: # timeout for message poll timeout = 1.0 elapsed = time.monotonic() - start_time if self.process_timeout: # +0.01 to prevent busy-waiting on / spamming consumer.poll. # consumer.consume() returns shortly before X expired # (a matter of milliseconds), so after it returns a first # time, it would then be called with a timeout in the order # of milliseconds, therefore returning immediately, then be # called again, etc. if elapsed + 0.01 >= self.process_timeout: break timeout = self.process_timeout - elapsed batch_size = self.batch_size if self.stop_after_objects: if total_objects_processed >= self.stop_after_objects: break # clamp batch size to avoid overrunning stop_after_objects batch_size = min( - self.stop_after_objects - total_objects_processed, - batch_size, + self.stop_after_objects - total_objects_processed, batch_size, ) - messages = self.consumer.consume( - timeout=timeout, num_messages=batch_size) + messages = self.consumer.consume(timeout=timeout, num_messages=batch_size) if not messages: continue batch_processed, at_eof = self.handle_messages(messages, worker_fn) total_objects_processed += batch_processed if at_eof: break return total_objects_processed def handle_messages(self, messages, worker_fn): objects: Dict[str, List[Any]] = defaultdict(list) nb_processed = 0 for message in messages: error = message.error() if error is not None: if error.code() == KafkaError._PARTITION_EOF: - self.eof_reached.add( - (message.topic(), message.partition()) - ) + self.eof_reached.add((message.topic(), message.partition())) else: _error_cb(error) continue nb_processed += 1 - object_type = message.topic().split('.')[-1] + object_type = message.topic().split(".")[-1] # Got a message from a topic we did not subscribe to. assert object_type in self._object_types, object_type objects[object_type].append(self.deserialize_message(message)) if objects: worker_fn(dict(objects)) self.consumer.commit() - at_eof = (self.stop_on_eof and all( + at_eof = self.stop_on_eof and all( (tp.topic, tp.partition) in self.eof_reached for tp in self.consumer.assignment() - )) + ) return nb_processed, at_eof def deserialize_message(self, message): return self.value_deserializer(message.value()) def close(self): self.consumer.close() diff --git a/swh/journal/fixer.py b/swh/journal/fixer.py index a3ff5b1..7322b35 100644 --- a/swh/journal/fixer.py +++ b/swh/journal/fixer.py @@ -1,290 +1,293 @@ import copy import logging from typing import Any, Dict, List, Optional from swh.model.identifiers import normalize_timestamp logger = logging.getLogger(__name__) def _fix_content(content: Dict[str, Any]) -> Dict[str, Any]: """Filters-out invalid 'perms' key that leaked from swh.model.from_disk to the journal. >>> _fix_content({'perms': 0o100644, 'sha1_git': b'foo'}) {'sha1_git': b'foo'} >>> _fix_content({'sha1_git': b'bar'}) {'sha1_git': b'bar'} """ content = content.copy() - content.pop('perms', None) + content.pop("perms", None) return content def _fix_revision_pypi_empty_string(rev): """PyPI loader failed to encode empty strings as bytes, see: swh:1:rev:8f0095ee0664867055d03de9bcc8f95b91d8a2b9 or https://forge.softwareheritage.org/D1772 """ rev = { **rev, - 'author': rev['author'].copy(), - 'committer': rev['committer'].copy(), + "author": rev["author"].copy(), + "committer": rev["committer"].copy(), } - if rev['author'].get('email') == '': - rev['author']['email'] = b'' - if rev['author'].get('name') == '': - rev['author']['name'] = b'' - if rev['committer'].get('email') == '': - rev['committer']['email'] = b'' - if rev['committer'].get('name') == '': - rev['committer']['name'] = b'' + if rev["author"].get("email") == "": + rev["author"]["email"] = b"" + if rev["author"].get("name") == "": + rev["author"]["name"] = b"" + if rev["committer"].get("email") == "": + rev["committer"]["email"] = b"" + if rev["committer"].get("name") == "": + rev["committer"]["name"] = b"" return rev def _fix_revision_transplant_source(rev): - if rev.get('metadata') and rev['metadata'].get('extra_headers'): + if rev.get("metadata") and rev["metadata"].get("extra_headers"): rev = copy.deepcopy(rev) - rev['metadata']['extra_headers'] = [ - [key, value.encode('ascii')] - if key == 'transplant_source' and isinstance(value, str) + rev["metadata"]["extra_headers"] = [ + [key, value.encode("ascii")] + if key == "transplant_source" and isinstance(value, str) else [key, value] - for (key, value) in rev['metadata']['extra_headers']] + for (key, value) in rev["metadata"]["extra_headers"] + ] return rev def _check_date(date): """Returns whether the date can be represented in backends with sane limits on timestamps and timezones (resp. signed 64-bits and signed 16 bits), and that microseconds is valid (ie. between 0 and 10^6). """ if date is None: return True date = normalize_timestamp(date) - return (-2**63 <= date['timestamp']['seconds'] < 2**63) \ - and (0 <= date['timestamp']['microseconds'] < 10**6) \ - and (-2**15 <= date['offset'] < 2**15) + return ( + (-(2 ** 63) <= date["timestamp"]["seconds"] < 2 ** 63) + and (0 <= date["timestamp"]["microseconds"] < 10 ** 6) + and (-(2 ** 15) <= date["offset"] < 2 ** 15) + ) def _check_revision_date(rev): """Exclude revisions with invalid dates. See https://forge.softwareheritage.org/T1339""" - return _check_date(rev['date']) and _check_date(rev['committer_date']) + return _check_date(rev["date"]) and _check_date(rev["committer_date"]) def _fix_revision(revision: Dict[str, Any]) -> Optional[Dict]: """Fix various legacy revision issues. Fix author/committer person: >>> from pprint import pprint >>> date = { ... 'timestamp': { ... 'seconds': 1565096932, ... 'microseconds': 0, ... }, ... 'offset': 0, ... } >>> rev0 = _fix_revision({ ... 'id': b'rev-id', ... 'author': {'fullname': b'', 'name': '', 'email': ''}, ... 'committer': {'fullname': b'', 'name': '', 'email': ''}, ... 'date': date, ... 'committer_date': date, ... 'type': 'git', ... 'message': '', ... 'directory': b'dir-id', ... 'synthetic': False, ... }) >>> rev0['author'] {'fullname': b'', 'name': b'', 'email': b''} >>> rev0['committer'] {'fullname': b'', 'name': b'', 'email': b''} Fix type of 'transplant_source' extra headers: >>> rev1 = _fix_revision({ ... 'id': b'rev-id', ... 'author': {'fullname': b'', 'name': '', 'email': ''}, ... 'committer': {'fullname': b'', 'name': '', 'email': ''}, ... 'date': date, ... 'committer_date': date, ... 'metadata': { ... 'extra_headers': [ ... ['time_offset_seconds', b'-3600'], ... ['transplant_source', '29c154a012a70f49df983625090434587622b39e'] ... ]}, ... 'type': 'git', ... 'message': '', ... 'directory': b'dir-id', ... 'synthetic': False, ... }) >>> pprint(rev1['metadata']['extra_headers']) [['time_offset_seconds', b'-3600'], ['transplant_source', b'29c154a012a70f49df983625090434587622b39e']] Revision with invalid date are filtered: >>> from copy import deepcopy >>> invalid_date1 = deepcopy(date) >>> invalid_date1['timestamp']['microseconds'] = 1000000000 # > 10^6 >>> rev = _fix_revision({ ... 'author': {'fullname': b'', 'name': '', 'email': ''}, ... 'committer': {'fullname': b'', 'name': '', 'email': ''}, ... 'date': invalid_date1, ... 'committer_date': date, ... }) >>> rev is None True >>> invalid_date2 = deepcopy(date) >>> invalid_date2['timestamp']['seconds'] = 2**70 # > 10^63 >>> rev = _fix_revision({ ... 'author': {'fullname': b'', 'name': '', 'email': ''}, ... 'committer': {'fullname': b'', 'name': '', 'email': ''}, ... 'date': invalid_date2, ... 'committer_date': date, ... }) >>> rev is None True >>> invalid_date3 = deepcopy(date) >>> invalid_date3['offset'] = 2**20 # > 10^15 >>> rev = _fix_revision({ ... 'author': {'fullname': b'', 'name': '', 'email': ''}, ... 'committer': {'fullname': b'', 'name': '', 'email': ''}, ... 'date': date, ... 'committer_date': invalid_date3, ... }) >>> rev is None True """ # noqa rev = _fix_revision_pypi_empty_string(revision) rev = _fix_revision_transplant_source(rev) if not _check_revision_date(rev): - logger.warning('Invalid revision date detected: %(revision)s', { - 'revision': rev - }) + logger.warning( + "Invalid revision date detected: %(revision)s", {"revision": rev} + ) return None return rev def _fix_origin(origin: Dict) -> Dict: """Fix legacy origin with type which is no longer part of the model. >>> from pprint import pprint >>> pprint(_fix_origin({ ... 'url': 'http://foo', ... })) {'url': 'http://foo'} >>> pprint(_fix_origin({ ... 'url': 'http://bar', ... 'type': 'foo', ... })) {'url': 'http://bar'} """ o = origin.copy() - o.pop('type', None) + o.pop("type", None) return o def _fix_origin_visit(visit: Dict) -> Dict: """Fix various legacy origin visit issues. `visit['origin']` is a dict instead of an URL: >>> from datetime import datetime, timezone >>> from pprint import pprint >>> date = datetime(2020, 2, 27, 14, 39, 19, tzinfo=timezone.utc) >>> pprint(_fix_origin_visit({ ... 'origin': {'url': 'http://foo'}, ... 'date': date, ... 'type': 'git', ... 'status': 'ongoing', ... 'snapshot': None, ... })) {'date': datetime.datetime(2020, 2, 27, 14, 39, 19, tzinfo=datetime.timezone.utc), 'metadata': None, 'origin': 'http://foo', 'snapshot': None, 'status': 'ongoing', 'type': 'git'} `visit['type']` is missing , but `origin['visit']['type']` exists: >>> pprint(_fix_origin_visit( ... {'origin': {'type': 'hg', 'url': 'http://foo'}, ... 'date': date, ... 'status': 'ongoing', ... 'snapshot': None, ... })) {'date': datetime.datetime(2020, 2, 27, 14, 39, 19, tzinfo=datetime.timezone.utc), 'metadata': None, 'origin': 'http://foo', 'snapshot': None, 'status': 'ongoing', 'type': 'hg'} Old visit format (origin_visit with no type) raises: >>> _fix_origin_visit({ ... 'origin': {'url': 'http://foo'}, ... 'date': date, ... 'status': 'ongoing', ... 'snapshot': None ... }) Traceback (most recent call last): ... ValueError: Old origin visit format detected... >>> _fix_origin_visit({ ... 'origin': 'http://foo', ... 'date': date, ... 'status': 'ongoing', ... 'snapshot': None ... }) Traceback (most recent call last): ... ValueError: Old origin visit format detected... """ # noqa visit = visit.copy() - if 'type' not in visit: - if isinstance(visit['origin'], dict) and 'type' in visit['origin']: + if "type" not in visit: + if isinstance(visit["origin"], dict) and "type" in visit["origin"]: # Very old version of the schema: visits did not have a type, # but their 'origin' field was a dict with a 'type' key. - visit['type'] = visit['origin']['type'] + visit["type"] = visit["origin"]["type"] else: # Very old schema version: 'type' is missing, stop early # We expect the journal's origin_visit topic to no longer reference # such visits. If it does, the replayer must crash so we can fix # the journal's topic. - raise ValueError(f'Old origin visit format detected: {visit}') - if isinstance(visit['origin'], dict): + raise ValueError(f"Old origin visit format detected: {visit}") + if isinstance(visit["origin"], dict): # Old version of the schema: visit['origin'] was a dict. - visit['origin'] = visit['origin']['url'] - if 'metadata' not in visit: - visit['metadata'] = None + visit["origin"] = visit["origin"]["url"] + if "metadata" not in visit: + visit["metadata"] = None return visit def fix_objects(object_type: str, objects: List[Dict]) -> List[Dict]: """ Fix legacy objects from the journal to bring them up to date with the latest storage schema. """ - if object_type == 'content': + if object_type == "content": return [_fix_content(v) for v in objects] - elif object_type == 'revision': + elif object_type == "revision": revisions = [_fix_revision(v) for v in objects] return [rev for rev in revisions if rev is not None] - elif object_type == 'origin': + elif object_type == "origin": return [_fix_origin(v) for v in objects] - elif object_type == 'origin_visit': + elif object_type == "origin_visit": return [_fix_origin_visit(v) for v in objects] else: return objects diff --git a/swh/journal/replay.py b/swh/journal/replay.py index 5461b22..dfc6fab 100644 --- a/swh/journal/replay.py +++ b/swh/journal/replay.py @@ -1,393 +1,413 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging from time import time -from typing import ( - Any, Callable, Dict, Iterable, List, Optional -) +from typing import Any, Callable, Dict, Iterable, List, Optional from sentry_sdk import capture_exception, push_scope + try: from systemd.daemon import notify except ImportError: notify = None from tenacity import ( - retry, retry_if_exception_type, stop_after_attempt, + retry, + retry_if_exception_type, + stop_after_attempt, wait_random_exponential, ) from swh.core.statsd import statsd from swh.journal.fixer import fix_objects from swh.model.hashutil import hash_to_hex from swh.model.model import ( - BaseContent, BaseModel, Content, Directory, Origin, OriginVisit, Revision, - SHA1_SIZE, SkippedContent, Snapshot, Release + BaseContent, + BaseModel, + Content, + Directory, + Origin, + OriginVisit, + Revision, + SHA1_SIZE, + SkippedContent, + Snapshot, + Release, ) from swh.objstorage.objstorage import ( - ID_HASH_ALGO, ObjNotFoundError, ObjStorage, + ID_HASH_ALGO, + ObjNotFoundError, + ObjStorage, ) from swh.storage.exc import HashCollision logger = logging.getLogger(__name__) GRAPH_OPERATIONS_METRIC = "swh_graph_replayer_operations_total" GRAPH_DURATION_METRIC = "swh_graph_replayer_duration_seconds" CONTENT_OPERATIONS_METRIC = "swh_content_replayer_operations_total" CONTENT_RETRY_METRIC = "swh_content_replayer_retries_total" CONTENT_BYTES_METRIC = "swh_content_replayer_bytes" CONTENT_DURATION_METRIC = "swh_content_replayer_duration_seconds" object_converter_fn: Dict[str, Callable[[Dict], BaseModel]] = { - 'origin': Origin.from_dict, - 'origin_visit': OriginVisit.from_dict, - 'snapshot': Snapshot.from_dict, - 'revision': Revision.from_dict, - 'release': Release.from_dict, - 'directory': Directory.from_dict, - 'content': Content.from_dict, - 'skipped_content': SkippedContent.from_dict, + "origin": Origin.from_dict, + "origin_visit": OriginVisit.from_dict, + "snapshot": Snapshot.from_dict, + "revision": Revision.from_dict, + "release": Release.from_dict, + "directory": Directory.from_dict, + "content": Content.from_dict, + "skipped_content": SkippedContent.from_dict, } def process_replay_objects(all_objects, *, storage): for (object_type, objects) in all_objects.items(): logger.debug("Inserting %s %s objects", len(objects), object_type) - with statsd.timed(GRAPH_DURATION_METRIC, - tags={'object_type': object_type}): + with statsd.timed(GRAPH_DURATION_METRIC, tags={"object_type": object_type}): _insert_objects(object_type, objects, storage) - statsd.increment(GRAPH_OPERATIONS_METRIC, len(objects), - tags={'object_type': object_type}) + statsd.increment( + GRAPH_OPERATIONS_METRIC, len(objects), tags={"object_type": object_type} + ) if notify: - notify('WATCHDOG=1') + notify("WATCHDOG=1") def collision_aware_content_add( - content_add_fn: Callable[[Iterable[Any]], None], - contents: List[BaseContent]) -> None: + content_add_fn: Callable[[Iterable[Any]], None], contents: List[BaseContent] +) -> None: """Add contents to storage. If a hash collision is detected, an error is logged. Then this adds the other non colliding contents to the storage. Args: content_add_fn: Storage content callable contents: List of contents or skipped contents to add to storage """ if not contents: return colliding_content_hashes: List[Dict[str, Any]] = [] while True: try: content_add_fn(contents) except HashCollision as e: - colliding_content_hashes.append({ - 'algo': e.algo, - 'hash': e.hash_id, # hex hash id - 'objects': e.colliding_contents # hex hashes - }) + colliding_content_hashes.append( + { + "algo": e.algo, + "hash": e.hash_id, # hex hash id + "objects": e.colliding_contents, # hex hashes + } + ) colliding_hashes = e.colliding_content_hashes() # Drop the colliding contents from the transaction - contents = [c for c in contents - if c.hashes() not in colliding_hashes] + contents = [c for c in contents if c.hashes() not in colliding_hashes] else: # Successfully added contents, we are done break if colliding_content_hashes: for collision in colliding_content_hashes: - logger.error('Collision detected: %(collision)s', { - 'collision': collision - }) + logger.error("Collision detected: %(collision)s", {"collision": collision}) def _insert_objects(object_type: str, objects: List[Dict], storage) -> None: """Insert objects of type object_type in the storage. """ objects = fix_objects(object_type, objects) - if object_type == 'content': + if object_type == "content": contents: List[BaseContent] = [] skipped_contents: List[BaseContent] = [] for content in objects: c = BaseContent.from_dict(content) if isinstance(c, SkippedContent): skipped_contents.append(c) else: contents.append(c) - collision_aware_content_add( - storage.skipped_content_add, skipped_contents) - collision_aware_content_add( - storage.content_add_metadata, contents) - elif object_type == 'origin_visit': + collision_aware_content_add(storage.skipped_content_add, skipped_contents) + collision_aware_content_add(storage.content_add_metadata, contents) + elif object_type == "origin_visit": visits: List[OriginVisit] = [] origins: List[Origin] = [] for obj in objects: visit = OriginVisit.from_dict(obj) visits.append(visit) origins.append(Origin(url=visit.origin)) storage.origin_add(origins) storage.origin_visit_upsert(visits) - elif object_type in ( - 'directory', 'revision', 'release', 'snapshot', 'origin' - ): - method = getattr(storage, object_type + '_add') + elif object_type in ("directory", "revision", "release", "snapshot", "origin"): + method = getattr(storage, object_type + "_add") method(object_converter_fn[object_type](o) for o in objects) else: - logger.warning('Received a series of %s, this should not happen', - object_type) + logger.warning("Received a series of %s, this should not happen", object_type) def is_hash_in_bytearray(hash_, array, nb_hashes, hash_size=SHA1_SIZE): """ Checks if the given hash is in the provided `array`. The array must be a *sorted* list of sha1 hashes, and contain `nb_hashes` hashes (so its size must by `nb_hashes*hash_size` bytes). Args: hash_ (bytes): the hash to look for array (bytes): a sorted concatenated array of hashes (may be of any type supporting slice indexing, eg. :class:`mmap.mmap`) nb_hashes (int): number of hashes in the array hash_size (int): size of a hash (defaults to 20, for SHA1) Example: >>> import os >>> hash1 = os.urandom(20) >>> hash2 = os.urandom(20) >>> hash3 = os.urandom(20) >>> array = b''.join(sorted([hash1, hash2])) >>> is_hash_in_bytearray(hash1, array, 2) True >>> is_hash_in_bytearray(hash2, array, 2) True >>> is_hash_in_bytearray(hash3, array, 2) False """ if len(hash_) != hash_size: - raise ValueError('hash_ does not match the provided hash_size.') + raise ValueError("hash_ does not match the provided hash_size.") def get_hash(position): - return array[position * hash_size:(position + 1) * hash_size] + return array[position * hash_size : (position + 1) * hash_size] # Regular dichotomy: left = 0 right = nb_hashes while left < right - 1: middle = int((right + left) / 2) pivot = get_hash(middle) if pivot == hash_: return True elif pivot < hash_: left = middle else: right = middle return get_hash(left) == hash_ class ReplayError(Exception): """An error occurred during the replay of an object""" + def __init__(self, operation, *, obj_id, exc): self.operation = operation self.obj_id = hash_to_hex(obj_id) self.exc = exc def __str__(self): - return "ReplayError(doing %s, %s, %s)" % ( - self.operation, self.obj_id, self.exc - ) + return "ReplayError(doing %s, %s, %s)" % (self.operation, self.obj_id, self.exc) def log_replay_retry(retry_obj, sleep, last_result): """Log a retry of the content replayer""" exc = last_result.exception() - logger.debug('Retry operation %(operation)s on %(obj_id)s: %(exc)s', - {'operation': exc.operation, 'obj_id': exc.obj_id, - 'exc': str(exc.exc)}) + logger.debug( + "Retry operation %(operation)s on %(obj_id)s: %(exc)s", + {"operation": exc.operation, "obj_id": exc.obj_id, "exc": str(exc.exc)}, + ) - statsd.increment(CONTENT_RETRY_METRIC, tags={ - 'operation': exc.operation, - 'attempt': str(retry_obj.statistics['attempt_number']), - }) + statsd.increment( + CONTENT_RETRY_METRIC, + tags={ + "operation": exc.operation, + "attempt": str(retry_obj.statistics["attempt_number"]), + }, + ) def log_replay_error(last_attempt): """Log a replay error to sentry""" exc = last_attempt.exception() with push_scope() as scope: - scope.set_tag('operation', exc.operation) - scope.set_extra('obj_id', exc.obj_id) + scope.set_tag("operation", exc.operation) + scope.set_extra("obj_id", exc.obj_id) capture_exception(exc.exc) logger.error( - 'Failed operation %(operation)s on %(obj_id)s after %(retries)s' - ' retries: %(exc)s', { - 'obj_id': exc.obj_id, 'operation': exc.operation, - 'exc': str(exc.exc), 'retries': last_attempt.attempt_number, - }) + "Failed operation %(operation)s on %(obj_id)s after %(retries)s" + " retries: %(exc)s", + { + "obj_id": exc.obj_id, + "operation": exc.operation, + "exc": str(exc.exc), + "retries": last_attempt.attempt_number, + }, + ) return None CONTENT_REPLAY_RETRIES = 3 content_replay_retry = retry( retry=retry_if_exception_type(ReplayError), stop=stop_after_attempt(CONTENT_REPLAY_RETRIES), wait=wait_random_exponential(multiplier=1, max=60), before_sleep=log_replay_retry, retry_error_callback=log_replay_error, ) @content_replay_retry def copy_object(obj_id, src, dst): hex_obj_id = hash_to_hex(obj_id) - obj = '' + obj = "" try: - with statsd.timed(CONTENT_DURATION_METRIC, tags={'request': 'get'}): + with statsd.timed(CONTENT_DURATION_METRIC, tags={"request": "get"}): obj = src.get(obj_id) - logger.debug('retrieved %(obj_id)s', {'obj_id': hex_obj_id}) + logger.debug("retrieved %(obj_id)s", {"obj_id": hex_obj_id}) - with statsd.timed(CONTENT_DURATION_METRIC, tags={'request': 'put'}): + with statsd.timed(CONTENT_DURATION_METRIC, tags={"request": "put"}): dst.add(obj, obj_id=obj_id, check_presence=False) - logger.debug('copied %(obj_id)s', {'obj_id': hex_obj_id}) + logger.debug("copied %(obj_id)s", {"obj_id": hex_obj_id}) statsd.increment(CONTENT_BYTES_METRIC, len(obj)) except ObjNotFoundError: - logger.error('Failed to copy %(obj_id)s: object not found', - {'obj_id': hex_obj_id}) + logger.error( + "Failed to copy %(obj_id)s: object not found", {"obj_id": hex_obj_id} + ) raise except Exception as exc: - raise ReplayError('copy', obj_id=obj_id, exc=exc) from None + raise ReplayError("copy", obj_id=obj_id, exc=exc) from None return len(obj) @content_replay_retry def obj_in_objstorage(obj_id, dst): """Check if an object is already in an objstorage, tenaciously""" try: return obj_id in dst except Exception as exc: - raise ReplayError('in_dst', obj_id=obj_id, exc=exc) from None + raise ReplayError("in_dst", obj_id=obj_id, exc=exc) from None def process_replay_objects_content( - all_objects: Dict[str, List[dict]], - *, - src: ObjStorage, - dst: ObjStorage, - exclude_fn: Optional[Callable[[dict], bool]] = None, - check_dst: bool = True, + all_objects: Dict[str, List[dict]], + *, + src: ObjStorage, + dst: ObjStorage, + exclude_fn: Optional[Callable[[dict], bool]] = None, + check_dst: bool = True, ): """ Takes a list of records from Kafka (see :py:func:`swh.journal.client.JournalClient.process`) and copies them from the `src` objstorage to the `dst` objstorage, if: * `obj['status']` is `'visible'` * `exclude_fn(obj)` is `False` (if `exclude_fn` is provided) * `obj['sha1'] not in dst` (if `check_dst` is True) Args: all_objects: Objects passed by the Kafka client. Most importantly, `all_objects['content'][*]['sha1']` is the sha1 hash of each content. src: An object storage (see :py:func:`swh.objstorage.get_objstorage`) dst: An object storage (see :py:func:`swh.objstorage.get_objstorage`) exclude_fn: Determines whether an object should be copied. check_dst: Determines whether we should check the destination objstorage before copying. Example: >>> from swh.objstorage import get_objstorage >>> src = get_objstorage('memory', {}) >>> dst = get_objstorage('memory', {}) >>> id1 = src.add(b'foo bar') >>> id2 = src.add(b'baz qux') >>> kafka_partitions = { ... 'content': [ ... { ... 'sha1': id1, ... 'status': 'visible', ... }, ... { ... 'sha1': id2, ... 'status': 'visible', ... }, ... ] ... } >>> process_replay_objects_content( ... kafka_partitions, src=src, dst=dst, ... exclude_fn=lambda obj: obj['sha1'] == id1) >>> id1 in dst False >>> id2 in dst True """ vol = [] nb_skipped = 0 nb_failures = 0 t0 = time() for (object_type, objects) in all_objects.items(): - if object_type != 'content': + if object_type != "content": logger.warning( - 'Received a series of %s, this should not happen', - object_type) + "Received a series of %s, this should not happen", object_type + ) continue for obj in objects: obj_id = obj[ID_HASH_ALGO] - if obj['status'] != 'visible': + if obj["status"] != "visible": nb_skipped += 1 - logger.debug('skipped %s (status=%s)', - hash_to_hex(obj_id), obj['status']) - statsd.increment(CONTENT_OPERATIONS_METRIC, - tags={"decision": "skipped", - "status": obj["status"]}) + logger.debug( + "skipped %s (status=%s)", hash_to_hex(obj_id), obj["status"] + ) + statsd.increment( + CONTENT_OPERATIONS_METRIC, + tags={"decision": "skipped", "status": obj["status"]}, + ) elif exclude_fn and exclude_fn(obj): nb_skipped += 1 - logger.debug('skipped %s (manually excluded)', - hash_to_hex(obj_id)) - statsd.increment(CONTENT_OPERATIONS_METRIC, - tags={"decision": "excluded"}) + logger.debug("skipped %s (manually excluded)", hash_to_hex(obj_id)) + statsd.increment( + CONTENT_OPERATIONS_METRIC, tags={"decision": "excluded"} + ) elif check_dst and obj_in_objstorage(obj_id, dst): nb_skipped += 1 - logger.debug('skipped %s (in dst)', hash_to_hex(obj_id)) - statsd.increment(CONTENT_OPERATIONS_METRIC, - tags={"decision": "in_dst"}) + logger.debug("skipped %s (in dst)", hash_to_hex(obj_id)) + statsd.increment(CONTENT_OPERATIONS_METRIC, tags={"decision": "in_dst"}) else: try: copied = copy_object(obj_id, src, dst) except ObjNotFoundError: nb_skipped += 1 - statsd.increment(CONTENT_OPERATIONS_METRIC, - tags={"decision": "not_in_src"}) + statsd.increment( + CONTENT_OPERATIONS_METRIC, tags={"decision": "not_in_src"} + ) else: if copied is None: nb_failures += 1 - statsd.increment(CONTENT_OPERATIONS_METRIC, - tags={"decision": "failed"}) + statsd.increment( + CONTENT_OPERATIONS_METRIC, tags={"decision": "failed"} + ) else: vol.append(copied) - statsd.increment(CONTENT_OPERATIONS_METRIC, - tags={"decision": "copied"}) + statsd.increment( + CONTENT_OPERATIONS_METRIC, tags={"decision": "copied"} + ) dt = time() - t0 logger.info( - 'processed %s content objects in %.1fsec ' - '(%.1f obj/sec, %.1fMB/sec) - %d failed - %d skipped', - len(vol), dt, + "processed %s content objects in %.1fsec " + "(%.1f obj/sec, %.1fMB/sec) - %d failed - %d skipped", + len(vol), + dt, len(vol) / dt, sum(vol) / 1024 / 1024 / dt, nb_failures, - nb_skipped) + nb_skipped, + ) if notify: - notify('WATCHDOG=1') + notify("WATCHDOG=1") diff --git a/swh/journal/tests/conftest.py b/swh/journal/tests/conftest.py index 92f4c7a..6a30af2 100644 --- a/swh/journal/tests/conftest.py +++ b/swh/journal/tests/conftest.py @@ -1,369 +1,337 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import pytest import logging import random import string from confluent_kafka import Consumer from confluent_kafka.admin import AdminClient, ConfigResource from hypothesis.strategies import one_of from subprocess import Popen from typing import Any, Dict, Iterator, List, Optional, Tuple from pathlib import Path from pytest_kafka import ( - make_zookeeper_process, make_kafka_server, KAFKA_SERVER_CONFIG_TEMPLATE, + make_zookeeper_process, + make_kafka_server, + KAFKA_SERVER_CONFIG_TEMPLATE, ZOOKEEPER_CONFIG_TEMPLATE, ) from swh.model import hypothesis_strategies as strategies from swh.model.hashutil import MultiHash, hash_to_bytes logger = logging.getLogger(__name__) CONTENTS = [ - { - **MultiHash.from_data(b'foo').digest(), - 'length': 3, - 'status': 'visible', - }, + {**MultiHash.from_data(b"foo").digest(), "length": 3, "status": "visible",}, ] duplicate_content1 = { - 'length': 4, - 'sha1': hash_to_bytes( - '44973274ccef6ab4dfaaf86599792fa9c3fe4689'), - 'sha1_git': b'another-foo', - 'blake2s256': b'another-bar', - 'sha256': b'another-baz', - 'status': 'visible', + "length": 4, + "sha1": hash_to_bytes("44973274ccef6ab4dfaaf86599792fa9c3fe4689"), + "sha1_git": b"another-foo", + "blake2s256": b"another-bar", + "sha256": b"another-baz", + "status": "visible", } # Craft a sha1 collision duplicate_content2 = duplicate_content1.copy() -sha1_array = bytearray(duplicate_content1['sha1_git']) +sha1_array = bytearray(duplicate_content1["sha1_git"]) sha1_array[0] += 1 -duplicate_content2['sha1_git'] = bytes(sha1_array) +duplicate_content2["sha1_git"] = bytes(sha1_array) DUPLICATE_CONTENTS = [duplicate_content1, duplicate_content2] COMMITTERS = [ - { - 'fullname': b'foo', - 'name': b'foo', - 'email': b'', - }, - { - 'fullname': b'bar', - 'name': b'bar', - 'email': b'', - } + {"fullname": b"foo", "name": b"foo", "email": b"",}, + {"fullname": b"bar", "name": b"bar", "email": b"",}, ] DATES = [ { - 'timestamp': { - 'seconds': 1234567891, - 'microseconds': 0, - }, - 'offset': 120, - 'negative_utc': False, + "timestamp": {"seconds": 1234567891, "microseconds": 0,}, + "offset": 120, + "negative_utc": False, }, { - 'timestamp': { - 'seconds': 1234567892, - 'microseconds': 0, - }, - 'offset': 120, - 'negative_utc': False, - } + "timestamp": {"seconds": 1234567892, "microseconds": 0,}, + "offset": 120, + "negative_utc": False, + }, ] REVISIONS = [ { - 'id': hash_to_bytes('7026b7c1a2af56521e951c01ed20f255fa054238'), - 'message': b'hello', - 'date': DATES[0], - 'committer': COMMITTERS[0], - 'author': COMMITTERS[0], - 'committer_date': DATES[0], - 'type': 'git', - 'directory': b'\x01' * 20, - 'synthetic': False, - 'metadata': None, - 'parents': [], + "id": hash_to_bytes("7026b7c1a2af56521e951c01ed20f255fa054238"), + "message": b"hello", + "date": DATES[0], + "committer": COMMITTERS[0], + "author": COMMITTERS[0], + "committer_date": DATES[0], + "type": "git", + "directory": b"\x01" * 20, + "synthetic": False, + "metadata": None, + "parents": [], }, { - 'id': hash_to_bytes('368a48fe15b7db2383775f97c6b247011b3f14f4'), - 'message': b'hello again', - 'date': DATES[1], - 'committer': COMMITTERS[1], - 'author': COMMITTERS[1], - 'committer_date': DATES[1], - 'type': 'hg', - 'directory': b'\x02' * 20, - 'synthetic': False, - 'metadata': None, - 'parents': [], + "id": hash_to_bytes("368a48fe15b7db2383775f97c6b247011b3f14f4"), + "message": b"hello again", + "date": DATES[1], + "committer": COMMITTERS[1], + "author": COMMITTERS[1], + "committer_date": DATES[1], + "type": "hg", + "directory": b"\x02" * 20, + "synthetic": False, + "metadata": None, + "parents": [], }, ] RELEASES = [ { - 'id': hash_to_bytes('d81cc0710eb6cf9efd5b920a8453e1e07157b6cd'), - 'name': b'v0.0.1', - 'date': { - 'timestamp': { - 'seconds': 1234567890, - 'microseconds': 0, - }, - 'offset': 120, - 'negative_utc': False, + "id": hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), + "name": b"v0.0.1", + "date": { + "timestamp": {"seconds": 1234567890, "microseconds": 0,}, + "offset": 120, + "negative_utc": False, }, - 'author': COMMITTERS[0], - 'target_type': 'revision', - 'target': b'\x04' * 20, - 'message': b'foo', - 'synthetic': False, + "author": COMMITTERS[0], + "target_type": "revision", + "target": b"\x04" * 20, + "message": b"foo", + "synthetic": False, }, ] ORIGINS = [ - { - 'url': 'https://somewhere.org/den/fox', - }, - { - 'url': 'https://overtherainbow.org/fox/den', - } + {"url": "https://somewhere.org/den/fox",}, + {"url": "https://overtherainbow.org/fox/den",}, ] ORIGIN_VISITS = [ { - 'origin': ORIGINS[0]['url'], - 'date': '2013-05-07 04:20:39.369271+00:00', - 'snapshot': None, # TODO - 'status': 'ongoing', # TODO - 'metadata': {'foo': 'bar'}, - 'type': 'git', + "origin": ORIGINS[0]["url"], + "date": "2013-05-07 04:20:39.369271+00:00", + "snapshot": None, # TODO + "status": "ongoing", # TODO + "metadata": {"foo": "bar"}, + "type": "git", }, { - 'origin': ORIGINS[0]['url'], - 'date': '2018-11-27 17:20:39+00:00', - 'snapshot': None, # TODO - 'status': 'ongoing', # TODO - 'metadata': {'baz': 'qux'}, - 'type': 'git', - } + "origin": ORIGINS[0]["url"], + "date": "2018-11-27 17:20:39+00:00", + "snapshot": None, # TODO + "status": "ongoing", # TODO + "metadata": {"baz": "qux"}, + "type": "git", + }, ] # From type to tuple (id, ) OBJECT_TYPE_KEYS = { - 'content': ('sha1', CONTENTS), - 'revision': ('id', REVISIONS), - 'release': ('id', RELEASES), - 'origin': (None, ORIGINS), - 'origin_visit': (None, ORIGIN_VISITS), + "content": ("sha1", CONTENTS), + "revision": ("id", REVISIONS), + "release": ("id", RELEASES), + "origin": (None, ORIGINS), + "origin_visit": (None, ORIGIN_VISITS), } # type: Dict[str, Tuple[Optional[str], List[Dict[str, Any]]]] -KAFKA_ROOT = os.environ.get('SWH_KAFKA_ROOT') -KAFKA_ROOT = KAFKA_ROOT if KAFKA_ROOT else os.path.dirname(__file__) + '/kafka' +KAFKA_ROOT = os.environ.get("SWH_KAFKA_ROOT") +KAFKA_ROOT = KAFKA_ROOT if KAFKA_ROOT else os.path.dirname(__file__) + "/kafka" if not os.path.exists(KAFKA_ROOT): - msg = ('Development error: %s must exist and target an ' - 'existing kafka installation' % KAFKA_ROOT) + msg = ( + "Development error: %s must exist and target an " + "existing kafka installation" % KAFKA_ROOT + ) raise ValueError(msg) -KAFKA_SCRIPTS = Path(KAFKA_ROOT) / 'bin' +KAFKA_SCRIPTS = Path(KAFKA_ROOT) / "bin" -KAFKA_BIN = str(KAFKA_SCRIPTS / 'kafka-server-start.sh') -ZOOKEEPER_BIN = str(KAFKA_SCRIPTS / 'zookeeper-server-start.sh') +KAFKA_BIN = str(KAFKA_SCRIPTS / "kafka-server-start.sh") +ZOOKEEPER_BIN = str(KAFKA_SCRIPTS / "zookeeper-server-start.sh") -ZK_CONFIG_TEMPLATE = ZOOKEEPER_CONFIG_TEMPLATE + '\nadmin.enableServer=false\n' -KAFKA_CONFIG_TEMPLATE = ( - KAFKA_SERVER_CONFIG_TEMPLATE + '\nmessage.max.bytes=104857600\n' -) +ZK_CONFIG_TEMPLATE = ZOOKEEPER_CONFIG_TEMPLATE + "\nadmin.enableServer=false\n" +KAFKA_CONFIG_TEMPLATE = KAFKA_SERVER_CONFIG_TEMPLATE + "\nmessage.max.bytes=104857600\n" # Those defines fixtures -zookeeper_proc = make_zookeeper_process(ZOOKEEPER_BIN, - zk_config_template=ZK_CONFIG_TEMPLATE, - scope='session') -os.environ['KAFKA_LOG4J_OPTS'] = \ - '-Dlog4j.configuration=file:%s/log4j.properties' % \ - os.path.dirname(__file__) -session_kafka_server = make_kafka_server(KAFKA_BIN, 'zookeeper_proc', - kafka_config_template=KAFKA_CONFIG_TEMPLATE, - scope='session') - -kafka_logger = logging.getLogger('kafka') +zookeeper_proc = make_zookeeper_process( + ZOOKEEPER_BIN, zk_config_template=ZK_CONFIG_TEMPLATE, scope="session" +) +os.environ[ + "KAFKA_LOG4J_OPTS" +] = "-Dlog4j.configuration=file:%s/log4j.properties" % os.path.dirname(__file__) +session_kafka_server = make_kafka_server( + KAFKA_BIN, + "zookeeper_proc", + kafka_config_template=KAFKA_CONFIG_TEMPLATE, + scope="session", +) + +kafka_logger = logging.getLogger("kafka") kafka_logger.setLevel(logging.WARN) -@pytest.fixture(scope='function') +@pytest.fixture(scope="function") def kafka_prefix(): """Pick a random prefix for kafka topics on each call""" - return ''.join(random.choice(string.ascii_lowercase) for _ in range(10)) + return "".join(random.choice(string.ascii_lowercase) for _ in range(10)) -@pytest.fixture(scope='function') +@pytest.fixture(scope="function") def kafka_consumer_group(kafka_prefix: str): """Pick a random consumer group for kafka consumers on each call""" return "test-consumer-%s" % kafka_prefix -@pytest.fixture(scope='session') +@pytest.fixture(scope="session") def kafka_admin_client(session_kafka_server: Tuple[Popen, int]) -> AdminClient: - return AdminClient({ - 'bootstrap.servers': 'localhost:%s' % session_kafka_server[1] - }) + return AdminClient({"bootstrap.servers": "localhost:%s" % session_kafka_server[1]}) -@pytest.fixture(scope='function') +@pytest.fixture(scope="function") def kafka_server_config_overrides() -> Dict[str, str]: return {} -@pytest.fixture(scope='function') +@pytest.fixture(scope="function") def kafka_server( - session_kafka_server: Tuple[Popen, int], - kafka_admin_client: AdminClient, - kafka_server_config_overrides: Dict[str, str], + session_kafka_server: Tuple[Popen, int], + kafka_admin_client: AdminClient, + kafka_server_config_overrides: Dict[str, str], ) -> Iterator[Tuple[Popen, int]]: # No overrides, we can just return the original broker connection if not kafka_server_config_overrides: yield session_kafka_server return # This is the minimal operation that the kafka_admin_client gives to # retrieve the cluster metadata, which we need to get the numeric id of the # broker spawned by pytest_kafka. - metadata = kafka_admin_client.list_topics('__consumer_offsets') + metadata = kafka_admin_client.list_topics("__consumer_offsets") broker_ids = [str(broker) for broker in metadata.brokers.keys()] assert len(broker_ids) == 1, "More than one broker found in the kafka cluster?!" # Pull the current broker configuration. describe_configs and alter_configs # generate a dict containing one concurrent.future per queried # ConfigResource, hence the use of .result() - broker = ConfigResource('broker', broker_ids[0]) + broker = ConfigResource("broker", broker_ids[0]) futures = kafka_admin_client.describe_configs([broker]) original_config = futures[broker].result() # Gather the list of settings we need to change in the broker # ConfigResource, and their original values in the to_restore dict to_restore = {} for key, new_value in kafka_server_config_overrides.items(): if key not in original_config: raise ValueError(f"Cannot override unknown configuration {key}") orig_value = original_config[key].value if orig_value == new_value: continue if original_config[key].is_read_only: raise ValueError(f"Cannot override read-only configuration {key}") broker.set_config(key, new_value) to_restore[key] = orig_value # to_restore will be empty if all the config "overrides" are equal to the # original value. No need to wait for a config alteration if that's the # case. The result() will raise a KafkaException if the settings change # failed. if to_restore: futures = kafka_admin_client.alter_configs([broker]) try: futures[broker].result() except Exception: raise yield session_kafka_server # Now we can restore the old setting values. Again, the result() will raise # a KafkaException if the settings change failed. if to_restore: for key, orig_value in to_restore.items(): broker.set_config(key, orig_value) futures = kafka_admin_client.alter_configs([broker]) try: futures[broker].result() except Exception: raise TEST_CONFIG = { - 'consumer_id': 'swh.journal.consumer', - 'object_types': OBJECT_TYPE_KEYS.keys(), - 'stop_after_objects': 1, # will read 1 object and stop - 'storage': {'cls': 'memory', 'args': {}}, + "consumer_id": "swh.journal.consumer", + "object_types": OBJECT_TYPE_KEYS.keys(), + "stop_after_objects": 1, # will read 1 object and stop + "storage": {"cls": "memory", "args": {}}, } @pytest.fixture -def test_config(kafka_server: Tuple[Popen, int], - kafka_prefix: str): +def test_config(kafka_server: Tuple[Popen, int], kafka_prefix: str): """Test configuration needed for producer/consumer """ _, port = kafka_server return { **TEST_CONFIG, - 'brokers': ['127.0.0.1:{}'.format(port)], - 'prefix': kafka_prefix + '.swh.journal.objects', + "brokers": ["127.0.0.1:{}".format(port)], + "prefix": kafka_prefix + ".swh.journal.objects", } @pytest.fixture def consumer( - kafka_server: Tuple[Popen, int], - test_config: Dict, - kafka_consumer_group: str, + kafka_server: Tuple[Popen, int], test_config: Dict, kafka_consumer_group: str, ) -> Consumer: """Get a connected Kafka consumer. """ _, kafka_port = kafka_server - consumer = Consumer({ - 'bootstrap.servers': '127.0.0.1:{}'.format(kafka_port), - 'auto.offset.reset': 'earliest', - 'enable.auto.commit': True, - 'group.id': kafka_consumer_group, - }) + consumer = Consumer( + { + "bootstrap.servers": "127.0.0.1:{}".format(kafka_port), + "auto.offset.reset": "earliest", + "enable.auto.commit": True, + "group.id": kafka_consumer_group, + } + ) kafka_topics = [ - '%s.%s' % (test_config['prefix'], object_type) - for object_type in test_config['object_types'] + "%s.%s" % (test_config["prefix"], object_type) + for object_type in test_config["object_types"] ] consumer.subscribe(kafka_topics) yield consumer consumer.close() def objects_d(): return one_of( - strategies.origins().map( - lambda x: ('origin', x.to_dict())), - strategies.origin_visits().map( - lambda x: ('origin_visit', x.to_dict())), - strategies.snapshots().map( - lambda x: ('snapshot', x.to_dict())), - strategies.releases().map( - lambda x: ('release', x.to_dict())), - strategies.revisions().map( - lambda x: ('revision', x.to_dict())), - strategies.directories().map( - lambda x: ('directory', x.to_dict())), - strategies.skipped_contents().map( - lambda x: ('skipped_content', x.to_dict())), - strategies.present_contents().map( - lambda x: ('content', x.to_dict())), + strategies.origins().map(lambda x: ("origin", x.to_dict())), + strategies.origin_visits().map(lambda x: ("origin_visit", x.to_dict())), + strategies.snapshots().map(lambda x: ("snapshot", x.to_dict())), + strategies.releases().map(lambda x: ("release", x.to_dict())), + strategies.revisions().map(lambda x: ("revision", x.to_dict())), + strategies.directories().map(lambda x: ("directory", x.to_dict())), + strategies.skipped_contents().map(lambda x: ("skipped_content", x.to_dict())), + strategies.present_contents().map(lambda x: ("content", x.to_dict())), ) diff --git a/swh/journal/tests/test_backfill.py b/swh/journal/tests/test_backfill.py index f9fd2b7..07194b0 100644 --- a/swh/journal/tests/test_backfill.py +++ b/swh/journal/tests/test_backfill.py @@ -1,124 +1,160 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import pytest -from swh.journal.backfill import ( - JournalBackfiller, compute_query, PARTITION_KEY -) +from swh.journal.backfill import JournalBackfiller, compute_query, PARTITION_KEY TEST_CONFIG = { - 'brokers': ['localhost'], - 'prefix': 'swh.tmp_journal.new', - 'client_id': 'swh.journal.client.test', - 'storage_dbconn': 'service=swh-dev', + "brokers": ["localhost"], + "prefix": "swh.tmp_journal.new", + "client_id": "swh.journal.client.test", + "storage_dbconn": "service=swh-dev", } def test_config_ko_missing_mandatory_key(): """Missing configuration key will make the initialization fail """ for key in TEST_CONFIG.keys(): config = TEST_CONFIG.copy() config.pop(key) with pytest.raises(ValueError) as e: JournalBackfiller(config) - error = ('Configuration error: The following keys must be' - ' provided: %s' % (','.join([key]), )) + error = "Configuration error: The following keys must be" " provided: %s" % ( + ",".join([key]), + ) assert e.value.args[0] == error def test_config_ko_unknown_object_type(): """Parse arguments will fail if the object type is unknown """ backfiller = JournalBackfiller(TEST_CONFIG) with pytest.raises(ValueError) as e: - backfiller.parse_arguments('unknown-object-type', 1, 2) + backfiller.parse_arguments("unknown-object-type", 1, 2) - error = ('Object type unknown-object-type is not supported. ' - 'The only possible values are %s' % ( - ', '.join(PARTITION_KEY))) + error = ( + "Object type unknown-object-type is not supported. " + "The only possible values are %s" % (", ".join(PARTITION_KEY)) + ) assert e.value.args[0] == error def test_compute_query_content(): - query, where_args, column_aliases = compute_query( - 'content', '\x000000', '\x000001') + query, where_args, column_aliases = compute_query("content", "\x000000", "\x000001") - assert where_args == ['\x000000', '\x000001'] + assert where_args == ["\x000000", "\x000001"] assert column_aliases == [ - 'sha1', 'sha1_git', 'sha256', 'blake2s256', 'length', 'status', - 'ctime' + "sha1", + "sha1_git", + "sha256", + "blake2s256", + "length", + "status", + "ctime", ] - assert query == ''' + assert ( + query + == """ select sha1,sha1_git,sha256,blake2s256,length,status,ctime from content where (sha1) >= %s and (sha1) < %s - ''' + """ + ) def test_compute_query_skipped_content(): - query, where_args, column_aliases = compute_query( - 'skipped_content', None, None) + query, where_args, column_aliases = compute_query("skipped_content", None, None) assert where_args == [] assert column_aliases == [ - 'sha1', 'sha1_git', 'sha256', 'blake2s256', 'length', 'ctime', - 'status', 'reason', + "sha1", + "sha1_git", + "sha256", + "blake2s256", + "length", + "ctime", + "status", + "reason", ] - assert query == ''' + assert ( + query + == """ select sha1,sha1_git,sha256,blake2s256,length,ctime,status,reason from skipped_content - ''' + """ + ) def test_compute_query_origin_visit(): - query, where_args, column_aliases = compute_query( - 'origin_visit', 1, 10) + query, where_args, column_aliases = compute_query("origin_visit", 1, 10) assert where_args == [1, 10] assert column_aliases == [ - 'visit', 'origin.type', 'origin_visit.type', - 'url', 'date', 'snapshot', 'status', 'metadata', + "visit", + "origin.type", + "origin_visit.type", + "url", + "date", + "snapshot", + "status", + "metadata", ] - assert query == ''' + assert ( + query + == """ select visit,origin.type,origin_visit.type,url,date,snapshot,status,metadata from origin_visit left join origin on origin_visit.origin=origin.id where (origin_visit.origin) >= %s and (origin_visit.origin) < %s - ''' + """ + ) def test_compute_query_release(): - query, where_args, column_aliases = compute_query( - 'release', '\x000002', '\x000003') + query, where_args, column_aliases = compute_query("release", "\x000002", "\x000003") - assert where_args == ['\x000002', '\x000003'] + assert where_args == ["\x000002", "\x000003"] assert column_aliases == [ - 'id', 'date', 'date_offset', 'comment', 'name', 'synthetic', - 'date_neg_utc_offset', 'target', 'target_type', 'author_id', - 'author_name', 'author_email', 'author_fullname'] + "id", + "date", + "date_offset", + "comment", + "name", + "synthetic", + "date_neg_utc_offset", + "target", + "target_type", + "author_id", + "author_name", + "author_email", + "author_fullname", + ] - assert query == ''' + assert ( + query + == """ select release.id as id,date,date_offset,comment,release.name as name,synthetic,date_neg_utc_offset,target,target_type,a.id as author_id,a.name as author_name,a.email as author_email,a.fullname as author_fullname from release left join person a on release.author=a.id where (release.id) >= %s and (release.id) < %s - ''' # noqa + """ # noqa + ) diff --git a/swh/journal/tests/test_cli.py b/swh/journal/tests/test_cli.py index ab8ef6d..eb19869 100644 --- a/swh/journal/tests/test_cli.py +++ b/swh/journal/tests/test_cli.py @@ -1,639 +1,631 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import Counter import copy import functools import logging import re import tempfile from subprocess import Popen from typing import Any, Dict, Tuple from unittest.mock import patch from click.testing import CliRunner from confluent_kafka import Producer import pytest import yaml from swh.model.hashutil import hash_to_hex from swh.objstorage.backends.in_memory import InMemoryObjStorage from swh.storage import get_storage from swh.journal.cli import cli from swh.journal.replay import CONTENT_REPLAY_RETRIES from swh.journal.serializers import key_to_kafka, value_to_kafka logger = logging.getLogger(__name__) CLI_CONFIG = { - 'storage': { - 'cls': 'memory', - }, - 'objstorage_src': { - 'cls': 'mocked', - 'name': 'src', - }, - 'objstorage_dst': { - 'cls': 'mocked', - 'name': 'dst', - }, + "storage": {"cls": "memory",}, + "objstorage_src": {"cls": "mocked", "name": "src",}, + "objstorage_dst": {"cls": "mocked", "name": "dst",}, } @pytest.fixture def storage(): """An swh-storage object that gets injected into the CLI functions.""" - storage_config = { - 'cls': 'pipeline', - 'steps': [ - {'cls': 'memory'}, - ] - } + storage_config = {"cls": "pipeline", "steps": [{"cls": "memory"},]} storage = get_storage(**storage_config) - with patch('swh.journal.cli.get_storage') as get_storage_mock: + with patch("swh.journal.cli.get_storage") as get_storage_mock: get_storage_mock.return_value = storage yield storage @pytest.fixture def monkeypatch_retry_sleep(monkeypatch): from swh.journal.replay import copy_object, obj_in_objstorage - monkeypatch.setattr(copy_object.retry, 'sleep', lambda x: None) - monkeypatch.setattr(obj_in_objstorage.retry, 'sleep', lambda x: None) + + monkeypatch.setattr(copy_object.retry, "sleep", lambda x: None) + monkeypatch.setattr(obj_in_objstorage.retry, "sleep", lambda x: None) def invoke(*args, env=None, journal_config=None): config = copy.deepcopy(CLI_CONFIG) if journal_config: - config['journal'] = journal_config + config["journal"] = journal_config runner = CliRunner() - with tempfile.NamedTemporaryFile('a', suffix='.yml') as config_fd: + with tempfile.NamedTemporaryFile("a", suffix=".yml") as config_fd: yaml.dump(config, config_fd) config_fd.seek(0) - args = ['-C' + config_fd.name] + list(args) - return runner.invoke( - cli, args, obj={'log_level': logging.DEBUG}, env=env, - ) + args = ["-C" + config_fd.name] + list(args) + return runner.invoke(cli, args, obj={"log_level": logging.DEBUG}, env=env,) def test_replay( - storage, - kafka_prefix: str, - kafka_consumer_group: str, - kafka_server: Tuple[Popen, int]): + storage, + kafka_prefix: str, + kafka_consumer_group: str, + kafka_server: Tuple[Popen, int], +): (_, kafka_port) = kafka_server - kafka_prefix += '.swh.journal.objects' - - producer = Producer({ - 'bootstrap.servers': 'localhost:{}'.format(kafka_port), - 'client.id': 'test-producer', - 'acks': 'all', - }) - - snapshot = {'id': b'foo', 'branches': { - b'HEAD': { - 'target_type': 'revision', - 'target': b'\x01' * 20, + kafka_prefix += ".swh.journal.objects" + + producer = Producer( + { + "bootstrap.servers": "localhost:{}".format(kafka_port), + "client.id": "test-producer", + "acks": "all", } - }} # type: Dict[str, Any] + ) + + snapshot = { + "id": b"foo", + "branches": {b"HEAD": {"target_type": "revision", "target": b"\x01" * 20,}}, + } # type: Dict[str, Any] producer.produce( - topic=kafka_prefix + '.snapshot', - key=key_to_kafka(snapshot['id']), + topic=kafka_prefix + ".snapshot", + key=key_to_kafka(snapshot["id"]), value=value_to_kafka(snapshot), ) producer.flush() - logger.debug('Flushed producer') + logger.debug("Flushed producer") result = invoke( - 'replay', - '--stop-after-objects', '1', + "replay", + "--stop-after-objects", + "1", journal_config={ - 'brokers': ['127.0.0.1:%d' % kafka_port], - 'group_id': kafka_consumer_group, - 'prefix': kafka_prefix, + "brokers": ["127.0.0.1:%d" % kafka_port], + "group_id": kafka_consumer_group, + "prefix": kafka_prefix, }, ) - expected = r'Done.\n' + expected = r"Done.\n" assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output - assert storage.snapshot_get(snapshot['id']) == { - **snapshot, 'next_branch': None} + assert storage.snapshot_get(snapshot["id"]) == {**snapshot, "next_branch": None} def _patch_objstorages(names): objstorages = {name: InMemoryObjStorage() for name in names} def get_mock_objstorage(cls, **args): - assert cls == 'mocked', cls - return objstorages[args['name']] + assert cls == "mocked", cls + return objstorages[args["name"]] def decorator(f): @functools.wraps(f) - @patch('swh.journal.cli.get_objstorage') + @patch("swh.journal.cli.get_objstorage") def newf(get_objstorage_mock, *args, **kwargs): get_objstorage_mock.side_effect = get_mock_objstorage f(*args, objstorages=objstorages, **kwargs) return newf return decorator NUM_CONTENTS = 10 def _fill_objstorage_and_kafka(kafka_port, kafka_prefix, objstorages): - producer = Producer({ - 'bootstrap.servers': '127.0.0.1:{}'.format(kafka_port), - 'client.id': 'test-producer', - 'acks': 'all', - }) + producer = Producer( + { + "bootstrap.servers": "127.0.0.1:{}".format(kafka_port), + "client.id": "test-producer", + "acks": "all", + } + ) contents = {} for i in range(NUM_CONTENTS): - content = b'\x00' * 19 + bytes([i]) - sha1 = objstorages['src'].add(content) + content = b"\x00" * 19 + bytes([i]) + sha1 = objstorages["src"].add(content) contents[sha1] = content producer.produce( - topic=kafka_prefix + '.content', + topic=kafka_prefix + ".content", key=key_to_kafka(sha1), - value=key_to_kafka({ - 'sha1': sha1, - 'status': 'visible', - }), + value=key_to_kafka({"sha1": sha1, "status": "visible",}), ) producer.flush() return contents -@_patch_objstorages(['src', 'dst']) +@_patch_objstorages(["src", "dst"]) def test_replay_content( - objstorages, - storage, - kafka_prefix: str, - kafka_consumer_group: str, - kafka_server: Tuple[Popen, int]): + objstorages, + storage, + kafka_prefix: str, + kafka_consumer_group: str, + kafka_server: Tuple[Popen, int], +): (_, kafka_port) = kafka_server - kafka_prefix += '.swh.journal.objects' + kafka_prefix += ".swh.journal.objects" - contents = _fill_objstorage_and_kafka( - kafka_port, kafka_prefix, objstorages) + contents = _fill_objstorage_and_kafka(kafka_port, kafka_prefix, objstorages) result = invoke( - 'content-replay', - '--stop-after-objects', str(NUM_CONTENTS), + "content-replay", + "--stop-after-objects", + str(NUM_CONTENTS), journal_config={ - 'brokers': ['127.0.0.1:%d' % kafka_port], - 'group_id': kafka_consumer_group, - 'prefix': kafka_prefix, + "brokers": ["127.0.0.1:%d" % kafka_port], + "group_id": kafka_consumer_group, + "prefix": kafka_prefix, }, ) - expected = r'Done.\n' + expected = r"Done.\n" assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output for (sha1, content) in contents.items(): - assert sha1 in objstorages['dst'], sha1 - assert objstorages['dst'].get(sha1) == content + assert sha1 in objstorages["dst"], sha1 + assert objstorages["dst"].get(sha1) == content -@_patch_objstorages(['src', 'dst']) +@_patch_objstorages(["src", "dst"]) def test_replay_content_structured_log( - objstorages, - storage, - kafka_prefix: str, - kafka_consumer_group: str, - kafka_server: Tuple[Popen, int], - caplog): + objstorages, + storage, + kafka_prefix: str, + kafka_consumer_group: str, + kafka_server: Tuple[Popen, int], + caplog, +): (_, kafka_port) = kafka_server - kafka_prefix += '.swh.journal.objects' + kafka_prefix += ".swh.journal.objects" - contents = _fill_objstorage_and_kafka( - kafka_port, kafka_prefix, objstorages) + contents = _fill_objstorage_and_kafka(kafka_port, kafka_prefix, objstorages) - caplog.set_level(logging.DEBUG, 'swh.journal.replay') + caplog.set_level(logging.DEBUG, "swh.journal.replay") expected_obj_ids = set(hash_to_hex(sha1) for sha1 in contents) result = invoke( - 'content-replay', - '--stop-after-objects', str(NUM_CONTENTS), + "content-replay", + "--stop-after-objects", + str(NUM_CONTENTS), journal_config={ - 'brokers': ['127.0.0.1:%d' % kafka_port], - 'group_id': kafka_consumer_group, - 'prefix': kafka_prefix, + "brokers": ["127.0.0.1:%d" % kafka_port], + "group_id": kafka_consumer_group, + "prefix": kafka_prefix, }, ) - expected = r'Done.\n' + expected = r"Done.\n" assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output copied = set() for record in caplog.records: logtext = record.getMessage() - if 'copied' in logtext: - copied.add(record.args['obj_id']) + if "copied" in logtext: + copied.add(record.args["obj_id"]) - assert copied == expected_obj_ids, ( - "Mismatched logging; see captured log output for details." - ) + assert ( + copied == expected_obj_ids + ), "Mismatched logging; see captured log output for details." -@_patch_objstorages(['src', 'dst']) +@_patch_objstorages(["src", "dst"]) def test_replay_content_static_group_id( - objstorages, - storage, - kafka_prefix: str, - kafka_consumer_group: str, - kafka_server: Tuple[Popen, int], - caplog): + objstorages, + storage, + kafka_prefix: str, + kafka_consumer_group: str, + kafka_server: Tuple[Popen, int], + caplog, +): (_, kafka_port) = kafka_server - kafka_prefix += '.swh.journal.objects' + kafka_prefix += ".swh.journal.objects" - contents = _fill_objstorage_and_kafka( - kafka_port, kafka_prefix, objstorages) + contents = _fill_objstorage_and_kafka(kafka_port, kafka_prefix, objstorages) # Setup log capture to fish the consumer settings out of the log messages - caplog.set_level(logging.DEBUG, 'swh.journal.client') + caplog.set_level(logging.DEBUG, "swh.journal.client") result = invoke( - 'content-replay', - '--stop-after-objects', str(NUM_CONTENTS), - env={'KAFKA_GROUP_INSTANCE_ID': 'static-group-instance-id'}, + "content-replay", + "--stop-after-objects", + str(NUM_CONTENTS), + env={"KAFKA_GROUP_INSTANCE_ID": "static-group-instance-id"}, journal_config={ - 'brokers': ['127.0.0.1:%d' % kafka_port], - 'group_id': kafka_consumer_group, - 'prefix': kafka_prefix, + "brokers": ["127.0.0.1:%d" % kafka_port], + "group_id": kafka_consumer_group, + "prefix": kafka_prefix, }, ) - expected = r'Done.\n' + expected = r"Done.\n" assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output consumer_settings = None for record in caplog.records: - if 'Consumer settings' in record.message: + if "Consumer settings" in record.message: consumer_settings = record.args break assert consumer_settings is not None, ( - 'Failed to get consumer settings out of the consumer log. ' - 'See log capture for details.' + "Failed to get consumer settings out of the consumer log. " + "See log capture for details." ) - assert consumer_settings['group.instance.id'] == 'static-group-instance-id' - assert consumer_settings['session.timeout.ms'] == 60 * 10 * 1000 - assert consumer_settings['max.poll.interval.ms'] == 90 * 10 * 1000 + assert consumer_settings["group.instance.id"] == "static-group-instance-id" + assert consumer_settings["session.timeout.ms"] == 60 * 10 * 1000 + assert consumer_settings["max.poll.interval.ms"] == 90 * 10 * 1000 for (sha1, content) in contents.items(): - assert sha1 in objstorages['dst'], sha1 - assert objstorages['dst'].get(sha1) == content + assert sha1 in objstorages["dst"], sha1 + assert objstorages["dst"].get(sha1) == content -@_patch_objstorages(['src', 'dst']) +@_patch_objstorages(["src", "dst"]) def test_replay_content_exclude( - objstorages, - storage, - kafka_prefix: str, - kafka_consumer_group: str, - kafka_server: Tuple[Popen, int]): + objstorages, + storage, + kafka_prefix: str, + kafka_consumer_group: str, + kafka_server: Tuple[Popen, int], +): (_, kafka_port) = kafka_server - kafka_prefix += '.swh.journal.objects' + kafka_prefix += ".swh.journal.objects" - contents = _fill_objstorage_and_kafka( - kafka_port, kafka_prefix, objstorages) + contents = _fill_objstorage_and_kafka(kafka_port, kafka_prefix, objstorages) excluded_contents = list(contents)[0::2] # picking half of them - with tempfile.NamedTemporaryFile(mode='w+b') as fd: - fd.write(b''.join(sorted(excluded_contents))) + with tempfile.NamedTemporaryFile(mode="w+b") as fd: + fd.write(b"".join(sorted(excluded_contents))) fd.seek(0) result = invoke( - 'content-replay', - '--stop-after-objects', str(NUM_CONTENTS), - '--exclude-sha1-file', fd.name, + "content-replay", + "--stop-after-objects", + str(NUM_CONTENTS), + "--exclude-sha1-file", + fd.name, journal_config={ - 'brokers': ['127.0.0.1:%d' % kafka_port], - 'group_id': kafka_consumer_group, - 'prefix': kafka_prefix, + "brokers": ["127.0.0.1:%d" % kafka_port], + "group_id": kafka_consumer_group, + "prefix": kafka_prefix, }, ) - expected = r'Done.\n' + expected = r"Done.\n" assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output for (sha1, content) in contents.items(): if sha1 in excluded_contents: - assert sha1 not in objstorages['dst'], sha1 + assert sha1 not in objstorages["dst"], sha1 else: - assert sha1 in objstorages['dst'], sha1 - assert objstorages['dst'].get(sha1) == content + assert sha1 in objstorages["dst"], sha1 + assert objstorages["dst"].get(sha1) == content NUM_CONTENTS_DST = 5 -@_patch_objstorages(['src', 'dst']) -@pytest.mark.parametrize("check_dst,expected_copied,expected_in_dst", [ - (True, NUM_CONTENTS - NUM_CONTENTS_DST, NUM_CONTENTS_DST), - (False, NUM_CONTENTS, 0), -]) +@_patch_objstorages(["src", "dst"]) +@pytest.mark.parametrize( + "check_dst,expected_copied,expected_in_dst", + [ + (True, NUM_CONTENTS - NUM_CONTENTS_DST, NUM_CONTENTS_DST), + (False, NUM_CONTENTS, 0), + ], +) def test_replay_content_check_dst( - objstorages, - storage, - kafka_prefix: str, - kafka_consumer_group: str, - kafka_server: Tuple[Popen, int], - check_dst: bool, - expected_copied: int, - expected_in_dst: int, - caplog): + objstorages, + storage, + kafka_prefix: str, + kafka_consumer_group: str, + kafka_server: Tuple[Popen, int], + check_dst: bool, + expected_copied: int, + expected_in_dst: int, + caplog, +): (_, kafka_port) = kafka_server - kafka_prefix += '.swh.journal.objects' + kafka_prefix += ".swh.journal.objects" - contents = _fill_objstorage_and_kafka( - kafka_port, kafka_prefix, objstorages) + contents = _fill_objstorage_and_kafka(kafka_port, kafka_prefix, objstorages) for i, (sha1, content) in enumerate(contents.items()): if i >= NUM_CONTENTS_DST: break - objstorages['dst'].add(content, obj_id=sha1) + objstorages["dst"].add(content, obj_id=sha1) - caplog.set_level(logging.DEBUG, 'swh.journal.replay') + caplog.set_level(logging.DEBUG, "swh.journal.replay") result = invoke( - 'content-replay', - '--stop-after-objects', str(NUM_CONTENTS), - '--check-dst' if check_dst else '--no-check-dst', + "content-replay", + "--stop-after-objects", + str(NUM_CONTENTS), + "--check-dst" if check_dst else "--no-check-dst", journal_config={ - 'brokers': ['127.0.0.1:%d' % kafka_port], - 'group_id': kafka_consumer_group, - 'prefix': kafka_prefix, + "brokers": ["127.0.0.1:%d" % kafka_port], + "group_id": kafka_consumer_group, + "prefix": kafka_prefix, }, ) - expected = r'Done.\n' + expected = r"Done.\n" assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output copied = 0 in_dst = 0 for record in caplog.records: logtext = record.getMessage() - if 'copied' in logtext: + if "copied" in logtext: copied += 1 - elif 'in dst' in logtext: + elif "in dst" in logtext: in_dst += 1 - assert (copied == expected_copied and in_dst == expected_in_dst), ( - "Unexpected amount of objects copied, see the captured log for details" - ) + assert ( + copied == expected_copied and in_dst == expected_in_dst + ), "Unexpected amount of objects copied, see the captured log for details" for (sha1, content) in contents.items(): - assert sha1 in objstorages['dst'], sha1 - assert objstorages['dst'].get(sha1) == content + assert sha1 in objstorages["dst"], sha1 + assert objstorages["dst"].get(sha1) == content class FlakyObjStorage(InMemoryObjStorage): def __init__(self, *args, **kwargs): - state = kwargs.pop('state') - self.failures_left = Counter(kwargs.pop('failures')) + state = kwargs.pop("state") + self.failures_left = Counter(kwargs.pop("failures")) super().__init__(*args, **kwargs) if state: self.state = state def flaky_operation(self, op, obj_id): if self.failures_left[op, obj_id] > 0: self.failures_left[op, obj_id] -= 1 - raise RuntimeError( - 'Failed %s on %s' % (op, hash_to_hex(obj_id)) - ) + raise RuntimeError("Failed %s on %s" % (op, hash_to_hex(obj_id))) def get(self, obj_id): - self.flaky_operation('get', obj_id) + self.flaky_operation("get", obj_id) return super().get(obj_id) def add(self, data, obj_id=None, check_presence=True): - self.flaky_operation('add', obj_id) - return super().add(data, obj_id=obj_id, - check_presence=check_presence) + self.flaky_operation("add", obj_id) + return super().add(data, obj_id=obj_id, check_presence=check_presence) def __contains__(self, obj_id): - self.flaky_operation('in', obj_id) + self.flaky_operation("in", obj_id) return super().__contains__(obj_id) -@_patch_objstorages(['src', 'dst']) +@_patch_objstorages(["src", "dst"]) def test_replay_content_check_dst_retry( - objstorages, - storage, - kafka_prefix: str, - kafka_consumer_group: str, - kafka_server: Tuple[Popen, int], - monkeypatch_retry_sleep): + objstorages, + storage, + kafka_prefix: str, + kafka_consumer_group: str, + kafka_server: Tuple[Popen, int], + monkeypatch_retry_sleep, +): (_, kafka_port) = kafka_server - kafka_prefix += '.swh.journal.objects' + kafka_prefix += ".swh.journal.objects" - contents = _fill_objstorage_and_kafka( - kafka_port, kafka_prefix, objstorages) + contents = _fill_objstorage_and_kafka(kafka_port, kafka_prefix, objstorages) failures = {} for i, (sha1, content) in enumerate(contents.items()): if i >= NUM_CONTENTS_DST: break - objstorages['dst'].add(content, obj_id=sha1) - failures['in', sha1] = 1 + objstorages["dst"].add(content, obj_id=sha1) + failures["in", sha1] = 1 - orig_dst = objstorages['dst'] - objstorages['dst'] = FlakyObjStorage(state=orig_dst.state, - failures=failures) + orig_dst = objstorages["dst"] + objstorages["dst"] = FlakyObjStorage(state=orig_dst.state, failures=failures) result = invoke( - 'content-replay', - '--stop-after-objects', str(NUM_CONTENTS), - '--check-dst', + "content-replay", + "--stop-after-objects", + str(NUM_CONTENTS), + "--check-dst", journal_config={ - 'brokers': ['127.0.0.1:%d' % kafka_port], - 'group_id': kafka_consumer_group, - 'prefix': kafka_prefix, + "brokers": ["127.0.0.1:%d" % kafka_port], + "group_id": kafka_consumer_group, + "prefix": kafka_prefix, }, ) - expected = r'Done.\n' + expected = r"Done.\n" assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output for (sha1, content) in contents.items(): - assert sha1 in objstorages['dst'], sha1 - assert objstorages['dst'].get(sha1) == content + assert sha1 in objstorages["dst"], sha1 + assert objstorages["dst"].get(sha1) == content -@_patch_objstorages(['src', 'dst']) +@_patch_objstorages(["src", "dst"]) def test_replay_content_failed_copy_retry( - objstorages, - storage, - kafka_prefix: str, - kafka_consumer_group: str, - kafka_server: Tuple[Popen, int], - caplog, - monkeypatch_retry_sleep): + objstorages, + storage, + kafka_prefix: str, + kafka_consumer_group: str, + kafka_server: Tuple[Popen, int], + caplog, + monkeypatch_retry_sleep, +): (_, kafka_port) = kafka_server - kafka_prefix += '.swh.journal.objects' + kafka_prefix += ".swh.journal.objects" - contents = _fill_objstorage_and_kafka( - kafka_port, kafka_prefix, objstorages) + contents = _fill_objstorage_and_kafka(kafka_port, kafka_prefix, objstorages) add_failures = {} get_failures = {} definitely_failed = set() # We want to generate operations failing 1 to CONTENT_REPLAY_RETRIES times. # We generate failures for 2 different operations, get and add. num_retry_contents = 2 * CONTENT_REPLAY_RETRIES - assert num_retry_contents < NUM_CONTENTS, ( - "Need to generate more test contents to properly test retry behavior" - ) + assert ( + num_retry_contents < NUM_CONTENTS + ), "Need to generate more test contents to properly test retry behavior" for i, sha1 in enumerate(contents): if i >= num_retry_contents: break # This generates a number of failures, up to CONTENT_REPLAY_RETRIES num_failures = (i % CONTENT_REPLAY_RETRIES) + 1 # This generates failures of add for the first CONTENT_REPLAY_RETRIES # objects, then failures of get. if i < CONTENT_REPLAY_RETRIES: - add_failures['add', sha1] = num_failures + add_failures["add", sha1] = num_failures else: - get_failures['get', sha1] = num_failures + get_failures["get", sha1] = num_failures # Only contents that have CONTENT_REPLAY_RETRIES or more are # definitely failing if num_failures >= CONTENT_REPLAY_RETRIES: definitely_failed.add(hash_to_hex(sha1)) - objstorages['dst'] = FlakyObjStorage( - state=objstorages['dst'].state, - failures=add_failures, + objstorages["dst"] = FlakyObjStorage( + state=objstorages["dst"].state, failures=add_failures, ) - objstorages['src'] = FlakyObjStorage( - state=objstorages['src'].state, - failures=get_failures, + objstorages["src"] = FlakyObjStorage( + state=objstorages["src"].state, failures=get_failures, ) - caplog.set_level(logging.DEBUG, 'swh.journal.replay') + caplog.set_level(logging.DEBUG, "swh.journal.replay") result = invoke( - 'content-replay', - '--stop-after-objects', str(NUM_CONTENTS), + "content-replay", + "--stop-after-objects", + str(NUM_CONTENTS), journal_config={ - 'brokers': ['127.0.0.1:%d' % kafka_port], - 'group_id': kafka_consumer_group, - 'prefix': kafka_prefix, + "brokers": ["127.0.0.1:%d" % kafka_port], + "group_id": kafka_consumer_group, + "prefix": kafka_prefix, }, ) - expected = r'Done.\n' + expected = r"Done.\n" assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output copied = 0 actually_failed = set() for record in caplog.records: logtext = record.getMessage() - if 'copied' in logtext: + if "copied" in logtext: copied += 1 - elif 'Failed operation' in logtext: + elif "Failed operation" in logtext: assert record.levelno == logging.ERROR - assert record.args['retries'] == CONTENT_REPLAY_RETRIES - actually_failed.add(record.args['obj_id']) + assert record.args["retries"] == CONTENT_REPLAY_RETRIES + actually_failed.add(record.args["obj_id"]) - assert actually_failed == definitely_failed, ( - 'Unexpected object copy failures; see captured log for details' - ) + assert ( + actually_failed == definitely_failed + ), "Unexpected object copy failures; see captured log for details" for (sha1, content) in contents.items(): if hash_to_hex(sha1) in definitely_failed: - assert sha1 not in objstorages['dst'] + assert sha1 not in objstorages["dst"] continue - assert sha1 in objstorages['dst'], sha1 - assert objstorages['dst'].get(sha1) == content + assert sha1 in objstorages["dst"], sha1 + assert objstorages["dst"].get(sha1) == content -@_patch_objstorages(['src', 'dst']) +@_patch_objstorages(["src", "dst"]) def test_replay_content_objnotfound( - objstorages, - storage, - kafka_prefix: str, - kafka_consumer_group: str, - kafka_server: Tuple[Popen, int], - caplog): + objstorages, + storage, + kafka_prefix: str, + kafka_consumer_group: str, + kafka_server: Tuple[Popen, int], + caplog, +): (_, kafka_port) = kafka_server - kafka_prefix += '.swh.journal.objects' + kafka_prefix += ".swh.journal.objects" - contents = _fill_objstorage_and_kafka( - kafka_port, kafka_prefix, objstorages) + contents = _fill_objstorage_and_kafka(kafka_port, kafka_prefix, objstorages) num_contents_deleted = 5 contents_deleted = set() for i, sha1 in enumerate(contents): if i >= num_contents_deleted: break - del objstorages['src'].state[sha1] + del objstorages["src"].state[sha1] contents_deleted.add(hash_to_hex(sha1)) - caplog.set_level(logging.DEBUG, 'swh.journal.replay') + caplog.set_level(logging.DEBUG, "swh.journal.replay") result = invoke( - 'content-replay', - '--stop-after-objects', str(NUM_CONTENTS), + "content-replay", + "--stop-after-objects", + str(NUM_CONTENTS), journal_config={ - 'brokers': ['127.0.0.1:%d' % kafka_port], - 'group_id': kafka_consumer_group, - 'prefix': kafka_prefix, + "brokers": ["127.0.0.1:%d" % kafka_port], + "group_id": kafka_consumer_group, + "prefix": kafka_prefix, }, ) - expected = r'Done.\n' + expected = r"Done.\n" assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output copied = 0 not_in_src = set() for record in caplog.records: logtext = record.getMessage() - if 'copied' in logtext: + if "copied" in logtext: copied += 1 - elif 'object not found' in logtext: + elif "object not found" in logtext: # Check that the object id can be recovered from logs assert record.levelno == logging.ERROR - not_in_src.add(record.args['obj_id']) + not_in_src.add(record.args["obj_id"]) - assert copied == NUM_CONTENTS - num_contents_deleted, ( - "Unexpected number of contents copied" - ) + assert ( + copied == NUM_CONTENTS - num_contents_deleted + ), "Unexpected number of contents copied" - assert not_in_src == contents_deleted, ( - "Mismatch between deleted contents and not_in_src logs" - ) + assert ( + not_in_src == contents_deleted + ), "Mismatch between deleted contents and not_in_src logs" for (sha1, content) in contents.items(): - if sha1 not in objstorages['src']: + if sha1 not in objstorages["src"]: continue - assert sha1 in objstorages['dst'], sha1 - assert objstorages['dst'].get(sha1) == content + assert sha1 in objstorages["dst"], sha1 + assert objstorages["dst"].get(sha1) == content diff --git a/swh/journal/tests/test_client.py b/swh/journal/tests/test_client.py index 122f304..f0bd3cd 100644 --- a/swh/journal/tests/test_client.py +++ b/swh/journal/tests/test_client.py @@ -1,138 +1,144 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from subprocess import Popen from typing import Dict, List, Tuple from unittest.mock import MagicMock from confluent_kafka import Producer import pytest from swh.model.hypothesis_strategies import revisions from swh.model.model import Content from swh.journal.client import JournalClient from swh.journal.serializers import key_to_kafka, value_to_kafka def test_client( - kafka_prefix: str, - kafka_consumer_group: str, - kafka_server: Tuple[Popen, int]): + kafka_prefix: str, kafka_consumer_group: str, kafka_server: Tuple[Popen, int] +): (_, port) = kafka_server - kafka_prefix += '.swh.journal.objects' - - producer = Producer({ - 'bootstrap.servers': 'localhost:{}'.format(port), - 'client.id': 'test producer', - 'acks': 'all', - }) + kafka_prefix += ".swh.journal.objects" + + producer = Producer( + { + "bootstrap.servers": "localhost:{}".format(port), + "client.id": "test producer", + "acks": "all", + } + ) rev = revisions().example() # Fill Kafka producer.produce( - topic=kafka_prefix + '.revision', key=key_to_kafka(rev.id), + topic=kafka_prefix + ".revision", + key=key_to_kafka(rev.id), value=value_to_kafka(rev.to_dict()), ) producer.flush() client = JournalClient( - brokers='localhost:%d' % kafka_server[1], + brokers="localhost:%d" % kafka_server[1], group_id=kafka_consumer_group, prefix=kafka_prefix, stop_after_objects=1, ) worker_fn = MagicMock() client.process(worker_fn) - worker_fn.assert_called_once_with({'revision': [rev.to_dict()]}) + worker_fn.assert_called_once_with({"revision": [rev.to_dict()]}) def test_client_eof( - kafka_prefix: str, - kafka_consumer_group: str, - kafka_server: Tuple[Popen, int]): + kafka_prefix: str, kafka_consumer_group: str, kafka_server: Tuple[Popen, int] +): (_, port) = kafka_server - kafka_prefix += '.swh.journal.objects' - - producer = Producer({ - 'bootstrap.servers': 'localhost:{}'.format(port), - 'client.id': 'test producer', - 'acks': 'all', - }) + kafka_prefix += ".swh.journal.objects" + + producer = Producer( + { + "bootstrap.servers": "localhost:{}".format(port), + "client.id": "test producer", + "acks": "all", + } + ) rev = revisions().example() # Fill Kafka producer.produce( - topic=kafka_prefix + '.revision', key=key_to_kafka(rev.id), + topic=kafka_prefix + ".revision", + key=key_to_kafka(rev.id), value=value_to_kafka(rev.to_dict()), ) producer.flush() client = JournalClient( - brokers='localhost:%d' % kafka_server[1], + brokers="localhost:%d" % kafka_server[1], group_id=kafka_consumer_group, prefix=kafka_prefix, stop_after_objects=None, stop_on_eof=True, ) worker_fn = MagicMock() client.process(worker_fn) - worker_fn.assert_called_once_with({'revision': [rev.to_dict()]}) + worker_fn.assert_called_once_with({"revision": [rev.to_dict()]}) @pytest.mark.parametrize("batch_size", [1, 5, 100]) def test_client_batch_size( - kafka_prefix: str, - kafka_consumer_group: str, - kafka_server: Tuple[Popen, int], - batch_size: int, + kafka_prefix: str, + kafka_consumer_group: str, + kafka_server: Tuple[Popen, int], + batch_size: int, ): (_, port) = kafka_server - kafka_prefix += '.swh.journal.objects' + kafka_prefix += ".swh.journal.objects" num_objects = 2 * batch_size + 1 assert num_objects < 256, "Too many objects, generation will fail" - producer = Producer({ - 'bootstrap.servers': 'localhost:{}'.format(port), - 'client.id': 'test producer', - 'acks': 'all', - }) + producer = Producer( + { + "bootstrap.servers": "localhost:{}".format(port), + "client.id": "test producer", + "acks": "all", + } + ) contents = [Content.from_data(bytes([i])) for i in range(num_objects)] # Fill Kafka for content in contents: producer.produce( - topic=kafka_prefix + '.content', + topic=kafka_prefix + ".content", key=key_to_kafka(content.sha1), value=value_to_kafka(content.to_dict()), ) producer.flush() client = JournalClient( - brokers=['localhost:%d' % kafka_server[1]], + brokers=["localhost:%d" % kafka_server[1]], group_id=kafka_consumer_group, prefix=kafka_prefix, stop_after_objects=num_objects, batch_size=batch_size, ) collected_output: List[Dict] = [] def worker_fn(objects): - received = objects['content'] + received = objects["content"] assert len(received) <= batch_size collected_output.extend(received) client.process(worker_fn) assert collected_output == [content.to_dict() for content in contents] diff --git a/swh/journal/tests/test_kafka_writer.py b/swh/journal/tests/test_kafka_writer.py index 0e32675..eb77867 100644 --- a/swh/journal/tests/test_kafka_writer.py +++ b/swh/journal/tests/test_kafka_writer.py @@ -1,178 +1,165 @@ # Copyright (C) 2018-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import defaultdict import datetime from confluent_kafka import Consumer, KafkaException from subprocess import Popen from typing import List, Tuple from swh.storage import get_storage from swh.journal.replay import object_converter_fn -from swh.journal.serializers import ( - kafka_to_key, kafka_to_value -) +from swh.journal.serializers import kafka_to_key, kafka_to_value from swh.journal.writer.kafka import KafkaJournalWriter, OBJECT_TYPES from swh.model.model import Content, Origin, BaseModel from .conftest import OBJECT_TYPE_KEYS MODEL_OBJECTS = {v: k for (k, v) in OBJECT_TYPES.items()} def consume_messages(consumer, kafka_prefix, expected_messages): """Consume expected_messages from the consumer; Sort them all into a consumed_objects dict""" consumed_messages = defaultdict(list) fetched_messages = 0 retries_left = 1000 while fetched_messages < expected_messages: if retries_left == 0: - raise ValueError('Timed out fetching messages from kafka') + raise ValueError("Timed out fetching messages from kafka") msg = consumer.poll(timeout=0.01) if not msg: retries_left -= 1 continue error = msg.error() if error is not None: if error.fatal(): raise KafkaException(error) retries_left -= 1 continue fetched_messages += 1 topic = msg.topic() - assert topic.startswith(kafka_prefix + '.'), "Unexpected topic" - object_type = topic[len(kafka_prefix + '.'):] + assert topic.startswith(kafka_prefix + "."), "Unexpected topic" + object_type = topic[len(kafka_prefix + ".") :] consumed_messages[object_type].append( (kafka_to_key(msg.key()), kafka_to_value(msg.value())) ) return consumed_messages def assert_all_objects_consumed(consumed_messages): """Check whether all objects from OBJECT_TYPE_KEYS have been consumed""" for (object_type, (key_name, objects)) in OBJECT_TYPE_KEYS.items(): (keys, values) = zip(*consumed_messages[object_type]) if key_name: assert list(keys) == [object_[key_name] for object_ in objects] else: pass # TODO - if object_type == 'origin_visit': + if object_type == "origin_visit": for value in values: - del value['visit'] - elif object_type == 'content': + del value["visit"] + elif object_type == "content": for value in values: - del value['ctime'] + del value["ctime"] for object_ in objects: assert object_ in values def test_kafka_writer( - kafka_prefix: str, - kafka_server: Tuple[Popen, int], - consumer: Consumer): - kafka_prefix += '.swh.journal.objects' + kafka_prefix: str, kafka_server: Tuple[Popen, int], consumer: Consumer +): + kafka_prefix += ".swh.journal.objects" writer = KafkaJournalWriter( - brokers=[f'localhost:{kafka_server[1]}'], - client_id='kafka_writer', + brokers=[f"localhost:{kafka_server[1]}"], + client_id="kafka_writer", prefix=kafka_prefix, - producer_config={ - 'message.max.bytes': 100000000, - }) + producer_config={"message.max.bytes": 100000000,}, + ) expected_messages = 0 for (object_type, (_, objects)) in OBJECT_TYPE_KEYS.items(): for (num, object_d) in enumerate(objects): - if object_type == 'origin_visit': - object_d = {**object_d, 'visit': num} - if object_type == 'content': - object_d = {**object_d, 'ctime': datetime.datetime.now()} + if object_type == "origin_visit": + object_d = {**object_d, "visit": num} + if object_type == "content": + object_d = {**object_d, "ctime": datetime.datetime.now()} object_ = MODEL_OBJECTS[object_type].from_dict(object_d) writer.write_addition(object_type, object_) expected_messages += 1 - consumed_messages = consume_messages( - consumer, kafka_prefix, expected_messages - ) + consumed_messages = consume_messages(consumer, kafka_prefix, expected_messages) assert_all_objects_consumed(consumed_messages) def test_storage_direct_writer( - kafka_prefix: str, - kafka_server: Tuple[Popen, int], - consumer: Consumer): - kafka_prefix += '.swh.journal.objects' + kafka_prefix: str, kafka_server: Tuple[Popen, int], consumer: Consumer +): + kafka_prefix += ".swh.journal.objects" writer_config = { - 'cls': 'kafka', - 'brokers': ['localhost:%d' % kafka_server[1]], - 'client_id': 'kafka_writer', - 'prefix': kafka_prefix, - 'producer_config': { - 'message.max.bytes': 100000000, - } + "cls": "kafka", + "brokers": ["localhost:%d" % kafka_server[1]], + "client_id": "kafka_writer", + "prefix": kafka_prefix, + "producer_config": {"message.max.bytes": 100000000,}, } storage_config = { - 'cls': 'pipeline', - 'steps': [ - {'cls': 'memory', 'journal_writer': writer_config}, - ] + "cls": "pipeline", + "steps": [{"cls": "memory", "journal_writer": writer_config},], } storage = get_storage(**storage_config) expected_messages = 0 for (object_type, (_, objects)) in OBJECT_TYPE_KEYS.items(): - method = getattr(storage, object_type + '_add') - if object_type in ('content', 'directory', 'revision', 'release', - 'snapshot', 'origin'): + method = getattr(storage, object_type + "_add") + if object_type in ( + "content", + "directory", + "revision", + "release", + "snapshot", + "origin", + ): objects_: List[BaseModel] - if object_type == 'content': - objects_ = [ - Content.from_dict({ - **obj, 'data': b''}) - for obj in objects - ] + if object_type == "content": + objects_ = [Content.from_dict({**obj, "data": b""}) for obj in objects] else: - objects_ = [ - object_converter_fn[object_type](obj) - for obj in objects - ] + objects_ = [object_converter_fn[object_type](obj) for obj in objects] method(objects_) expected_messages += len(objects) - elif object_type in ('origin_visit',): + elif object_type in ("origin_visit",): for object_ in objects: object_ = object_.copy() - origin_url = object_.pop('origin') + origin_url = object_.pop("origin") storage.origin_add_one(Origin(url=origin_url)) - visit = method(origin_url, date=object_.pop('date'), - type=object_.pop('type')) + visit = method( + origin_url, date=object_.pop("date"), type=object_.pop("type") + ) expected_messages += 1 storage.origin_visit_update(origin_url, visit.visit, **object_) expected_messages += 1 else: assert False, object_type - consumed_messages = consume_messages( - consumer, kafka_prefix, expected_messages - ) + consumed_messages = consume_messages(consumer, kafka_prefix, expected_messages) assert_all_objects_consumed(consumed_messages) diff --git a/swh/journal/tests/test_replay.py b/swh/journal/tests/test_replay.py index dc2653e..ae9ae0c 100644 --- a/swh/journal/tests/test_replay.py +++ b/swh/journal/tests/test_replay.py @@ -1,417 +1,414 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import functools import logging import random from subprocess import Popen from typing import Dict, List, Tuple import dateutil import pytest from confluent_kafka import Producer from hypothesis import strategies, given, settings from swh.storage import get_storage from swh.journal.client import JournalClient from swh.journal.serializers import key_to_kafka, value_to_kafka from swh.journal.replay import process_replay_objects, is_hash_in_bytearray from swh.model.hashutil import hash_to_hex from swh.model.model import Content from .conftest import OBJECT_TYPE_KEYS, DUPLICATE_CONTENTS from .utils import MockedJournalClient, MockedKafkaWriter -storage_config = { - 'cls': 'pipeline', - 'steps': [ - {'cls': 'memory'}, - ] -} +storage_config = {"cls": "pipeline", "steps": [{"cls": "memory"},]} def make_topic(kafka_prefix: str, object_type: str) -> str: - return kafka_prefix + '.' + object_type + return kafka_prefix + "." + object_type def test_storage_play( - kafka_prefix: str, - kafka_consumer_group: str, - kafka_server: Tuple[Popen, int], - caplog): + kafka_prefix: str, + kafka_consumer_group: str, + kafka_server: Tuple[Popen, int], + caplog, +): """Optimal replayer scenario. This: - writes objects to the topic - replayer consumes objects from the topic and replay them """ (_, port) = kafka_server - kafka_prefix += '.swh.journal.objects' + kafka_prefix += ".swh.journal.objects" storage = get_storage(**storage_config) - producer = Producer({ - 'bootstrap.servers': 'localhost:{}'.format(port), - 'client.id': 'test producer', - 'acks': 'all', - }) + producer = Producer( + { + "bootstrap.servers": "localhost:{}".format(port), + "client.id": "test producer", + "acks": "all", + } + ) now = datetime.datetime.now(tz=datetime.timezone.utc) # Fill Kafka nb_sent = 0 nb_visits = 0 for (object_type, (_, objects)) in OBJECT_TYPE_KEYS.items(): topic = make_topic(kafka_prefix, object_type) for object_ in objects: key = bytes(random.randint(0, 255) for _ in range(40)) object_ = object_.copy() - if object_type == 'content': - object_['ctime'] = now - elif object_type == 'origin_visit': + if object_type == "content": + object_["ctime"] = now + elif object_type == "origin_visit": nb_visits += 1 - object_['visit'] = nb_visits + object_["visit"] = nb_visits producer.produce( - topic=topic, key=key_to_kafka(key), - value=value_to_kafka(object_), + topic=topic, key=key_to_kafka(key), value=value_to_kafka(object_), ) nb_sent += 1 producer.flush() - caplog.set_level(logging.ERROR, 'swh.journal.replay') + caplog.set_level(logging.ERROR, "swh.journal.replay") # Fill the storage from Kafka replayer = JournalClient( - brokers='localhost:%d' % kafka_server[1], + brokers="localhost:%d" % kafka_server[1], group_id=kafka_consumer_group, prefix=kafka_prefix, stop_after_objects=nb_sent, ) worker_fn = functools.partial(process_replay_objects, storage=storage) nb_inserted = 0 while nb_inserted < nb_sent: nb_inserted += replayer.process(worker_fn) assert nb_sent == nb_inserted # Check the objects were actually inserted in the storage - assert OBJECT_TYPE_KEYS['revision'][1] == \ - list(storage.revision_get( - [rev['id'] for rev in OBJECT_TYPE_KEYS['revision'][1]])) - assert OBJECT_TYPE_KEYS['release'][1] == \ - list(storage.release_get( - [rel['id'] for rel in OBJECT_TYPE_KEYS['release'][1]])) - - origins = list(storage.origin_get( - [orig for orig in OBJECT_TYPE_KEYS['origin'][1]])) - assert OBJECT_TYPE_KEYS['origin'][1] == \ - [{'url': orig['url']} for orig in origins] + assert OBJECT_TYPE_KEYS["revision"][1] == list( + storage.revision_get([rev["id"] for rev in OBJECT_TYPE_KEYS["revision"][1]]) + ) + assert OBJECT_TYPE_KEYS["release"][1] == list( + storage.release_get([rel["id"] for rel in OBJECT_TYPE_KEYS["release"][1]]) + ) + + origins = list(storage.origin_get([orig for orig in OBJECT_TYPE_KEYS["origin"][1]])) + assert OBJECT_TYPE_KEYS["origin"][1] == [{"url": orig["url"]} for orig in origins] for origin in origins: - origin_url = origin['url'] + origin_url = origin["url"] expected_visits = [ { **visit, - 'origin': origin_url, - 'date': dateutil.parser.parse(visit['date']), + "origin": origin_url, + "date": dateutil.parser.parse(visit["date"]), } - for visit in OBJECT_TYPE_KEYS['origin_visit'][1] - if visit['origin'] == origin['url'] + for visit in OBJECT_TYPE_KEYS["origin_visit"][1] + if visit["origin"] == origin["url"] ] - actual_visits = list(storage.origin_visit_get( - origin_url)) + actual_visits = list(storage.origin_visit_get(origin_url)) for visit in actual_visits: - del visit['visit'] # opaque identifier + del visit["visit"] # opaque identifier assert expected_visits == actual_visits - input_contents = OBJECT_TYPE_KEYS['content'][1] - contents = storage.content_get_metadata( - [cont['sha1'] for cont in input_contents]) + input_contents = OBJECT_TYPE_KEYS["content"][1] + contents = storage.content_get_metadata([cont["sha1"] for cont in input_contents]) assert len(contents) == len(input_contents) - assert contents == {cont['sha1']: [cont] for cont in input_contents} + assert contents == {cont["sha1"]: [cont] for cont in input_contents} collision = 0 for record in caplog.records: logtext = record.getMessage() - if 'Colliding contents:' in logtext: + if "Colliding contents:" in logtext: collision += 1 assert collision == 0, "No collision should be detected" def test_storage_play_with_collision( - kafka_prefix: str, - kafka_consumer_group: str, - kafka_server: Tuple[Popen, int], - caplog): + kafka_prefix: str, + kafka_consumer_group: str, + kafka_server: Tuple[Popen, int], + caplog, +): """Another replayer scenario with collisions. This: - writes objects to the topic, including colliding contents - replayer consumes objects from the topic and replay them - This drops the colliding contents from the replay when detected """ (_, port) = kafka_server - kafka_prefix += '.swh.journal.objects' + kafka_prefix += ".swh.journal.objects" storage = get_storage(**storage_config) - producer = Producer({ - 'bootstrap.servers': 'localhost:{}'.format(port), - 'client.id': 'test producer', - 'enable.idempotence': 'true', - }) + producer = Producer( + { + "bootstrap.servers": "localhost:{}".format(port), + "client.id": "test producer", + "enable.idempotence": "true", + } + ) now = datetime.datetime.now(tz=datetime.timezone.utc) # Fill Kafka nb_sent = 0 nb_visits = 0 for (object_type, (_, objects)) in OBJECT_TYPE_KEYS.items(): topic = make_topic(kafka_prefix, object_type) for object_ in objects: key = bytes(random.randint(0, 255) for _ in range(40)) object_ = object_.copy() - if object_type == 'content': - object_['ctime'] = now - elif object_type == 'origin_visit': + if object_type == "content": + object_["ctime"] = now + elif object_type == "origin_visit": nb_visits += 1 - object_['visit'] = nb_visits + object_["visit"] = nb_visits producer.produce( - topic=topic, key=key_to_kafka(key), - value=value_to_kafka(object_), + topic=topic, key=key_to_kafka(key), value=value_to_kafka(object_), ) nb_sent += 1 # Create collision in input data # They are not written in the destination for content in DUPLICATE_CONTENTS: - topic = make_topic(kafka_prefix, 'content') + topic = make_topic(kafka_prefix, "content") producer.produce( - topic=topic, key=key_to_kafka(key), - value=value_to_kafka(content), + topic=topic, key=key_to_kafka(key), value=value_to_kafka(content), ) nb_sent += 1 producer.flush() - caplog.set_level(logging.ERROR, 'swh.journal.replay') + caplog.set_level(logging.ERROR, "swh.journal.replay") # Fill the storage from Kafka replayer = JournalClient( - brokers='localhost:%d' % kafka_server[1], + brokers="localhost:%d" % kafka_server[1], group_id=kafka_consumer_group, prefix=kafka_prefix, stop_after_objects=nb_sent, ) worker_fn = functools.partial(process_replay_objects, storage=storage) nb_inserted = 0 while nb_inserted < nb_sent: nb_inserted += replayer.process(worker_fn) assert nb_sent == nb_inserted # Check the objects were actually inserted in the storage - assert OBJECT_TYPE_KEYS['revision'][1] == \ - list(storage.revision_get( - [rev['id'] for rev in OBJECT_TYPE_KEYS['revision'][1]])) - assert OBJECT_TYPE_KEYS['release'][1] == \ - list(storage.release_get( - [rel['id'] for rel in OBJECT_TYPE_KEYS['release'][1]])) - - origins = list(storage.origin_get( - [orig for orig in OBJECT_TYPE_KEYS['origin'][1]])) - assert OBJECT_TYPE_KEYS['origin'][1] == \ - [{'url': orig['url']} for orig in origins] + assert OBJECT_TYPE_KEYS["revision"][1] == list( + storage.revision_get([rev["id"] for rev in OBJECT_TYPE_KEYS["revision"][1]]) + ) + assert OBJECT_TYPE_KEYS["release"][1] == list( + storage.release_get([rel["id"] for rel in OBJECT_TYPE_KEYS["release"][1]]) + ) + + origins = list(storage.origin_get([orig for orig in OBJECT_TYPE_KEYS["origin"][1]])) + assert OBJECT_TYPE_KEYS["origin"][1] == [{"url": orig["url"]} for orig in origins] for origin in origins: - origin_url = origin['url'] + origin_url = origin["url"] expected_visits = [ { **visit, - 'origin': origin_url, - 'date': dateutil.parser.parse(visit['date']), + "origin": origin_url, + "date": dateutil.parser.parse(visit["date"]), } - for visit in OBJECT_TYPE_KEYS['origin_visit'][1] - if visit['origin'] == origin['url'] + for visit in OBJECT_TYPE_KEYS["origin_visit"][1] + if visit["origin"] == origin["url"] ] - actual_visits = list(storage.origin_visit_get( - origin_url)) + actual_visits = list(storage.origin_visit_get(origin_url)) for visit in actual_visits: - del visit['visit'] # opaque identifier + del visit["visit"] # opaque identifier assert expected_visits == actual_visits - input_contents = OBJECT_TYPE_KEYS['content'][1] - contents = storage.content_get_metadata( - [cont['sha1'] for cont in input_contents]) + input_contents = OBJECT_TYPE_KEYS["content"][1] + contents = storage.content_get_metadata([cont["sha1"] for cont in input_contents]) assert len(contents) == len(input_contents) - assert contents == {cont['sha1']: [cont] for cont in input_contents} + assert contents == {cont["sha1"]: [cont] for cont in input_contents} nb_collisions = 0 actual_collision: Dict for record in caplog.records: logtext = record.getMessage() - if 'Collision detected:' in logtext: + if "Collision detected:" in logtext: nb_collisions += 1 - actual_collision = record.args['collision'] + actual_collision = record.args["collision"] assert nb_collisions == 1, "1 collision should be detected" - algo = 'sha1' - assert actual_collision['algo'] == algo + algo = "sha1" + assert actual_collision["algo"] == algo expected_colliding_hash = hash_to_hex(DUPLICATE_CONTENTS[0][algo]) - assert actual_collision['hash'] == expected_colliding_hash + assert actual_collision["hash"] == expected_colliding_hash - actual_colliding_hashes = actual_collision['objects'] + actual_colliding_hashes = actual_collision["objects"] assert len(actual_colliding_hashes) == len(DUPLICATE_CONTENTS) for content in DUPLICATE_CONTENTS: expected_content_hashes = { - k: hash_to_hex(v) - for k, v in Content.from_dict(content).hashes().items() + k: hash_to_hex(v) for k, v in Content.from_dict(content).hashes().items() } assert expected_content_hashes in actual_colliding_hashes def _test_write_replay_origin_visit(visits: List[Dict]): """Helper function to write tests for origin_visit. Each visit (a dict) given in the 'visits' argument will be sent to a (mocked) kafka queue, which a in-memory-storage backed replayer is listening to. Check that corresponding origin visits entities are present in the storage and have correct values if they are not skipped. """ queue: List = [] replayer = MockedJournalClient(queue) writer = MockedKafkaWriter(queue) # Note that flipping the order of these two insertions will crash # the test, because the legacy origin_format does not allow to create # the origin when needed (type is missing) - writer.send('origin', 'foo', { - 'url': 'http://example.com/', - 'type': 'git', # test the legacy origin format is accepted - }) + writer.send( + "origin", + "foo", + { + "url": "http://example.com/", + "type": "git", # test the legacy origin format is accepted + }, + ) for visit in visits: - writer.send('origin_visit', 'foo', visit) + writer.send("origin_visit", "foo", visit) queue_size = len(queue) assert replayer.stop_after_objects is None replayer.stop_after_objects = queue_size storage = get_storage(**storage_config) worker_fn = functools.partial(process_replay_objects, storage=storage) replayer.process(worker_fn) - actual_visits = list(storage.origin_visit_get('http://example.com/')) + actual_visits = list(storage.origin_visit_get("http://example.com/")) assert len(actual_visits) == len(visits), actual_visits for vin, vout in zip(visits, actual_visits): vin = vin.copy() vout = vout.copy() - assert vout.pop('origin') == 'http://example.com/' - vin.pop('origin') - vin.setdefault('type', 'git') - vin.setdefault('metadata', None) + assert vout.pop("origin") == "http://example.com/" + vin.pop("origin") + vin.setdefault("type", "git") + vin.setdefault("metadata", None) assert vin == vout def test_write_replay_origin_visit(): """Test origin_visit when the 'origin' is just a string.""" now = datetime.datetime.now() - visits = [{ - 'visit': 1, - 'origin': 'http://example.com/', - 'date': now, - 'type': 'git', - 'status': 'partial', - 'snapshot': None, - }] + visits = [ + { + "visit": 1, + "origin": "http://example.com/", + "date": now, + "type": "git", + "status": "partial", + "snapshot": None, + } + ] _test_write_replay_origin_visit(visits) def test_write_replay_legacy_origin_visit1(): """Origin_visit with no types should make the replayer crash We expect the journal's origin_visit topic to no longer reference such visits. If it does, the replayer must crash so we can fix the journal's topic. """ now = datetime.datetime.now() visit = { - 'visit': 1, - 'origin': 'http://example.com/', - 'date': now, - 'status': 'partial', - 'snapshot': None, + "visit": 1, + "origin": "http://example.com/", + "date": now, + "status": "partial", + "snapshot": None, } now2 = datetime.datetime.now() visit2 = { - 'visit': 2, - 'origin': {'url': 'http://example.com/'}, - 'date': now2, - 'status': 'partial', - 'snapshot': None, + "visit": 2, + "origin": {"url": "http://example.com/"}, + "date": now2, + "status": "partial", + "snapshot": None, } for origin_visit in [visit, visit2]: - with pytest.raises(ValueError, match='Old origin visit format'): + with pytest.raises(ValueError, match="Old origin visit format"): _test_write_replay_origin_visit([origin_visit]) def test_write_replay_legacy_origin_visit2(): """Test origin_visit when 'type' is missing from the visit, but not from the origin.""" now = datetime.datetime.now() - visits = [{ - 'visit': 1, - 'origin': { - 'url': 'http://example.com/', - 'type': 'git', - }, - 'date': now, - 'type': 'git', - 'status': 'partial', - 'snapshot': None, - }] + visits = [ + { + "visit": 1, + "origin": {"url": "http://example.com/", "type": "git",}, + "date": now, + "type": "git", + "status": "partial", + "snapshot": None, + } + ] _test_write_replay_origin_visit(visits) def test_write_replay_legacy_origin_visit3(): """Test origin_visit when the origin is a dict""" now = datetime.datetime.now() - visits = [{ - 'visit': 1, - 'origin': { - 'url': 'http://example.com/', - }, - 'date': now, - 'type': 'git', - 'status': 'partial', - 'snapshot': None, - }] + visits = [ + { + "visit": 1, + "origin": {"url": "http://example.com/",}, + "date": now, + "type": "git", + "status": "partial", + "snapshot": None, + } + ] _test_write_replay_origin_visit(visits) hash_strategy = strategies.binary(min_size=20, max_size=20) @settings(max_examples=500) -@given(strategies.sets(hash_strategy, min_size=0, max_size=500), - strategies.sets(hash_strategy, min_size=10)) +@given( + strategies.sets(hash_strategy, min_size=0, max_size=500), + strategies.sets(hash_strategy, min_size=10), +) def test_is_hash_in_bytearray(haystack, needles): - array = b''.join(sorted(haystack)) + array = b"".join(sorted(haystack)) needles |= haystack # Exhaustively test for all objects in the array for needle in needles: - assert is_hash_in_bytearray(needle, array, len(haystack)) == \ - (needle in haystack) + assert is_hash_in_bytearray(needle, array, len(haystack)) == ( + needle in haystack + ) diff --git a/swh/journal/tests/test_serializers.py b/swh/journal/tests/test_serializers.py index 9d4bdd4..b5f0421 100644 --- a/swh/journal/tests/test_serializers.py +++ b/swh/journal/tests/test_serializers.py @@ -1,29 +1,29 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import OrderedDict import itertools import unittest from swh.journal import serializers class TestSerializers(unittest.TestCase): def test_key_to_kafka_repeatable(self): """Check the kafka key encoding is repeatable""" base_dict = { - 'a': 'foo', - 'b': 'bar', - 'c': 'baz', + "a": "foo", + "b": "bar", + "c": "baz", } key = serializers.key_to_kafka(base_dict) for dict_keys in itertools.permutations(base_dict): d = OrderedDict() for k in dict_keys: d[k] = base_dict[k] self.assertEqual(key, serializers.key_to_kafka(d)) diff --git a/swh/journal/tests/test_write_replay.py b/swh/journal/tests/test_write_replay.py index 5e39eb8..3c4ffe6 100644 --- a/swh/journal/tests/test_write_replay.py +++ b/swh/journal/tests/test_write_replay.py @@ -1,160 +1,165 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import functools from unittest.mock import patch import attr from hypothesis import given, settings, HealthCheck from hypothesis.strategies import lists from swh.model.hypothesis_strategies import present_contents from swh.model.model import Origin from swh.storage import get_storage from swh.storage.exc import HashCollision from swh.journal.replay import ( - process_replay_objects, process_replay_objects_content, object_converter_fn + process_replay_objects, + process_replay_objects_content, + object_converter_fn, ) from .utils import MockedJournalClient, MockedKafkaWriter from .conftest import objects_d storage_config = { - 'cls': 'memory', - 'journal_writer': {'cls': 'memory'}, + "cls": "memory", + "journal_writer": {"cls": "memory"}, } def empty_person_name_email(rev_or_rel): """Empties the 'name' and 'email' fields of the author/committer fields of a revision or release; leaving only the fullname.""" - if getattr(rev_or_rel, 'author', None): + if getattr(rev_or_rel, "author", None): rev_or_rel = attr.evolve( - rev_or_rel, - author=attr.evolve( - rev_or_rel.author, - name=b'', - email=b'', - ) + rev_or_rel, author=attr.evolve(rev_or_rel.author, name=b"", email=b"",) ) - if getattr(rev_or_rel, 'committer', None): + if getattr(rev_or_rel, "committer", None): rev_or_rel = attr.evolve( rev_or_rel, - committer=attr.evolve( - rev_or_rel.committer, - name=b'', - email=b'', - ) + committer=attr.evolve(rev_or_rel.committer, name=b"", email=b"",), ) return rev_or_rel @given(lists(objects_d(), min_size=1)) @settings(suppress_health_check=[HealthCheck.too_slow]) def test_write_replay_same_order_batches(objects): queue = [] replayer = MockedJournalClient(queue) - with patch('swh.journal.writer.inmemory.InMemoryJournalWriter', - return_value=MockedKafkaWriter(queue)): + with patch( + "swh.journal.writer.inmemory.InMemoryJournalWriter", + return_value=MockedKafkaWriter(queue), + ): storage1 = get_storage(**storage_config) # Write objects to storage1 for (obj_type, obj) in objects: - if obj_type == 'content' and obj.get('status') == 'absent': - obj_type = 'skipped_content' + if obj_type == "content" and obj.get("status") == "absent": + obj_type = "skipped_content" obj = object_converter_fn[obj_type](obj) - if obj_type == 'origin_visit': + if obj_type == "origin_visit": storage1.origin_add_one(Origin(url=obj.origin)) storage1.origin_visit_upsert([obj]) else: - method = getattr(storage1, obj_type + '_add') + method = getattr(storage1, obj_type + "_add") try: method([obj]) except HashCollision: pass # Bail out early if we didn't insert any relevant objects... queue_size = len(queue) assert queue_size != 0, "No test objects found; hypothesis strategy bug?" assert replayer.stop_after_objects is None replayer.stop_after_objects = queue_size storage2 = get_storage(**storage_config) worker_fn = functools.partial(process_replay_objects, storage=storage2) replayer.process(worker_fn) assert replayer.consumer.committed - for attr_name in ('_contents', '_directories', - '_snapshots', '_origin_visits', '_origins'): - assert getattr(storage1, attr_name) == getattr(storage2, attr_name), \ - attr_name + for attr_name in ( + "_contents", + "_directories", + "_snapshots", + "_origin_visits", + "_origins", + ): + assert getattr(storage1, attr_name) == getattr(storage2, attr_name), attr_name # When hypothesis generates a revision and a release with same # author (or committer) fullname but different name or email, then # the storage will use the first name/email it sees. # This first one will be either the one from the revision or the release, # and since there is no order guarantees, storage2 has 1/2 chance of # not seeing the same order as storage1, therefore we need to strip # them out before comparing. - for attr_name in ('_revisions', '_releases'): - items1 = {k: empty_person_name_email(v) - for (k, v) in getattr(storage1, attr_name).items()} - items2 = {k: empty_person_name_email(v) - for (k, v) in getattr(storage2, attr_name).items()} + for attr_name in ("_revisions", "_releases"): + items1 = { + k: empty_person_name_email(v) + for (k, v) in getattr(storage1, attr_name).items() + } + items2 = { + k: empty_person_name_email(v) + for (k, v) in getattr(storage2, attr_name).items() + } assert items1 == items2, attr_name # TODO: add test for hash collision @given(lists(present_contents(), min_size=1)) @settings(suppress_health_check=[HealthCheck.too_slow]) def test_write_replay_content(objects): queue = [] replayer = MockedJournalClient(queue) - with patch('swh.journal.writer.inmemory.InMemoryJournalWriter', - return_value=MockedKafkaWriter(queue)): + with patch( + "swh.journal.writer.inmemory.InMemoryJournalWriter", + return_value=MockedKafkaWriter(queue), + ): storage1 = get_storage(**storage_config) contents = [] for obj in objects: storage1.content_add([obj]) contents.append(obj) # Bail out early if we didn't insert any relevant objects... queue_size = len(queue) assert queue_size != 0, "No test objects found; hypothesis strategy bug?" assert replayer.stop_after_objects is None replayer.stop_after_objects = queue_size storage2 = get_storage(**storage_config) objstorage1 = storage1.objstorage.objstorage objstorage2 = storage2.objstorage.objstorage - worker_fn = functools.partial(process_replay_objects_content, - src=objstorage1, - dst=objstorage2) + worker_fn = functools.partial( + process_replay_objects_content, src=objstorage1, dst=objstorage2 + ) replayer.process(worker_fn) # only content with status visible will be copied in storage2 expected_objstorage_state = { - c.sha1: c.data for c in contents if c.status == 'visible' + c.sha1: c.data for c in contents if c.status == "visible" } assert expected_objstorage_state == objstorage2.state diff --git a/swh/journal/tests/utils.py b/swh/journal/tests/utils.py index 48d1d92..ee492ef 100644 --- a/swh/journal/tests/utils.py +++ b/swh/journal/tests/utils.py @@ -1,79 +1,79 @@ from swh.journal.client import JournalClient, ACCEPTED_OBJECT_TYPES from swh.journal.writer.kafka import KafkaJournalWriter -from swh.journal.serializers import (kafka_to_value, key_to_kafka, - value_to_kafka) +from swh.journal.serializers import kafka_to_value, key_to_kafka, value_to_kafka class FakeKafkaMessage: def __init__(self, topic, key, value): self._topic = topic self._key = key_to_kafka(key) self._value = value_to_kafka(value) def topic(self): return self._topic def value(self): return self._value def key(self): return self._key def error(self): return None class MockedKafkaWriter(KafkaJournalWriter): def __init__(self, queue): - self._prefix = 'prefix' + self._prefix = "prefix" self.queue = queue def send(self, topic, key, value): msg = FakeKafkaMessage(topic=topic, key=key, value=value) self.queue.append(msg) def flush(self): pass class MockedKafkaConsumer: """Mimic the confluent_kafka.Consumer API, producing the messages stored in `queue`. You're only allowed to subscribe to topics in which the queue has messages. """ + def __init__(self, queue): self.queue = queue self.committed = False def consume(self, num_messages, timeout=None): L = self.queue[0:num_messages] self.queue[0:num_messages] = [] return L def commit(self): if self.queue == []: self.committed = True def list_topics(self, timeout=None): return set(message.topic() for message in self.queue) def subscribe(self, topics): unknown_topics = set(topics) - self.list_topics() if unknown_topics: - raise ValueError('Unknown topics %s' % ', '.join(unknown_topics)) + raise ValueError("Unknown topics %s" % ", ".join(unknown_topics)) def close(self): pass class MockedJournalClient(JournalClient): def __init__(self, queue, object_types=ACCEPTED_OBJECT_TYPES): self._object_types = object_types self.consumer = MockedKafkaConsumer(queue) self.process_timeout = None self.stop_after_objects = None self.value_deserializer = kafka_to_value self.stop_on_eof = False self.batch_size = 200 diff --git a/swh/journal/writer/__init__.py b/swh/journal/writer/__init__.py index ef09e56..0cea174 100644 --- a/swh/journal/writer/__init__.py +++ b/swh/journal/writer/__init__.py @@ -1,27 +1,29 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import warnings def get_journal_writer(cls, **kwargs): - if 'args' in kwargs: + if "args" in kwargs: warnings.warn( 'Explicit "args" key is deprecated, use keys directly instead.', - DeprecationWarning) - kwargs = kwargs['args'] + DeprecationWarning, + ) + kwargs = kwargs["args"] - if cls == 'inmemory': # FIXME: Remove inmemory in due time - warnings.warn("cls = 'inmemory' is deprecated, use 'memory' instead", - DeprecationWarning) - cls = 'memory' - if cls == 'memory': + if cls == "inmemory": # FIXME: Remove inmemory in due time + warnings.warn( + "cls = 'inmemory' is deprecated, use 'memory' instead", DeprecationWarning + ) + cls = "memory" + if cls == "memory": from .inmemory import InMemoryJournalWriter as JournalWriter - elif cls == 'kafka': + elif cls == "kafka": from .kafka import KafkaJournalWriter as JournalWriter else: - raise ValueError('Unknown journal writer class `%s`' % cls) + raise ValueError("Unknown journal writer class `%s`" % cls) return JournalWriter(**kwargs) diff --git a/tox.ini b/tox.ini index 1a0a86d..e8aa391 100644 --- a/tox.ini +++ b/tox.ini @@ -1,33 +1,40 @@ [tox] -envlist=flake8,mypy,py3 +envlist=black,flake8,mypy,py3 [testenv] passenv=SWH_KAFKA_ROOT extras = testing deps = pytest-cov dev: pdbpp setenv = SWH_KAFKA_ROOT = {env:SWH_KAFKA_ROOT:swh/journal/tests/kafka} commands = pytest --cov={envsitepackagesdir}/swh/journal \ {envsitepackagesdir}/swh/journal \ --cov-branch \ --doctest-modules {posargs} +[testenv:black] +skip_install = true +deps = + black +commands = + {envpython} -m black --check swh + [testenv:flake8] skip_install = true deps = git+https://github.com/PyCQA/pyflakes.git flake8 commands = {envpython} -m flake8 [testenv:mypy] extras = testing deps = mypy commands = mypy swh