diff --git a/PKG-INFO b/PKG-INFO index 45b0bb3..bab4758 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,70 +1,70 @@ Metadata-Version: 2.1 Name: swh.journal -Version: 0.0.13 +Version: 0.0.14 Summary: Software Heritage Journal utilities Home-page: https://forge.softwareheritage.org/diffusion/DJNL/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN -Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Source, https://forge.softwareheritage.org/source/swh-journal +Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Description: swh-journal =========== Persistent logger of changes to the archive, with publish-subscribe support. See the [documentation](https://docs.softwareheritage.org/devel/swh-journal/index.html#software-heritage-journal) for more details. # Local test As a pre-requisite, you need a kakfa installation path. The following target will take care of this: ``` make install ``` Then, provided you are in the right virtual environment as described in the [swh getting-started](https://docs.softwareheritage.org/devel/developer-setup.html#developer-setup): ``` pytest ``` or: ``` tox ``` # Running ## publisher Command: ``` $ swh-journal --config-file ~/.config/swh/journal/publisher.yml \ publisher ``` # Auto-completion To have the completion, add the following in your ~/.virtualenvs/swh/bin/postactivate: ``` eval "$(_SWH_JOURNAL_COMPLETE=$autocomplete_cmd swh-journal)" ``` Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Description-Content-Type: text/markdown Provides-Extra: testing diff --git a/debian/changelog b/debian/changelog index 279fbdf..5294620 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,114 +1,118 @@ -swh-journal (0.0.13-1~swh1~bpo10+1) buster-swh; urgency=medium +swh-journal (0.0.14-1~swh1) unstable-swh; urgency=medium - * Rebuild for buster-swh + * New upstream release 0.0.14 - (tagged by David Douard + on 2019-07-18 13:38:39 +0200) + * Upstream changes: - 0.0.14 - Code of conduct - fix the + backfiller - fix compatibility with click < 7 - make the + replayer robust against old formats - -- Nicolas Dandrimont Thu, 11 Jul 2019 14:20:40 +0200 + -- Software Heritage autobuilder (on jenkins-debian1) Thu, 18 Jul 2019 11:44:40 +0000 swh-journal (0.0.13-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.13 - (tagged by Antoine R. Dumont (@ardumont) on 2019-07-03 10:26:29 +0200) * Upstream changes: - v0.0.13 - cli: Document depreated options -- Software Heritage autobuilder (on jenkins-debian1) Wed, 03 Jul 2019 08:33:57 +0000 swh-journal (0.0.12-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.12 - (tagged by Valentin Lorentz on 2019-07-02 11:58:00 +0200) * Upstream changes: - v0.0.12 - More CLI option - Replay parallelism - Fix build on Debian 9 -- Software Heritage autobuilder (on jenkins-debian1) Tue, 02 Jul 2019 10:08:07 +0000 swh-journal (0.0.11-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.11 - (tagged by David Douard on 2019-06-12 13:58:14 +0200) * Upstream changes: - v0.0.11 -- Software Heritage autobuilder (on jenkins-debian1) Wed, 12 Jun 2019 12:10:52 +0000 swh-journal (0.0.10-1~swh2) unstable-swh; urgency=medium * Disable tests at build-time -- Nicolas Dandrimont Thu, 09 May 2019 14:42:24 +0200 swh-journal (0.0.10-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.10 - (tagged by Nicolas Dandrimont on 2019-05-09 14:29:52 +0200) * Upstream changes: - Release swh.journal v0.0.10 - Remove the publisher component, introduce the backfiller component. -- Software Heritage autobuilder (on jenkins-debian1) Thu, 09 May 2019 12:34:36 +0000 swh-journal (0.0.9-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.9 - (tagged by David Douard on 2019-04-10 13:42:32 +0200) * Upstream changes: - v0.0.9 -- Software Heritage autobuilder (on jenkins-debian1) Wed, 10 Apr 2019 11:48:39 +0000 swh-journal (0.0.8-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.8 - (tagged by Antoine R. Dumont (@ardumont) on 2019-03-15 13:56:57 +0100) * Upstream changes: - v0.0.8 - Add swh-journal cli -- Software Heritage autobuilder (on jenkins-debian1) Fri, 15 Mar 2019 13:00:11 +0000 swh-journal (0.0.7-1~swh2) unstable-swh; urgency=low * New release fixing build dependencies -- Antoine Romain Dumont (@ardumont) Tue, 19 Feb 2019 14:18:06 +0100 swh-journal (0.0.7-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.7 - (tagged by Antoine R. Dumont (@ardumont) on 2019-01-11 11:53:44 +0100) * Upstream changes: - v0.0.7 - Fix off-by-one error when checking max_messages. - tests: Adapt tests according to latest in-memory storage changes -- Software Heritage autobuilder (on jenkins-debian1) Fri, 11 Jan 2019 10:56:46 +0000 swh-journal (0.0.4-1~swh1) unstable-swh; urgency=medium * Release swh.journal version 0.0.4 * Update packaging runes -- Nicolas Dandrimont Thu, 12 Oct 2017 19:01:53 +0200 swh-journal (0.0.3-1~swh1) unstable-swh; urgency=medium * Release swh.journal v0.0.3 * Prepare building for stretch -- Nicolas Dandrimont Fri, 30 Jun 2017 17:29:15 +0200 swh-journal (0.0.2-1~swh1) unstable-swh; urgency=medium * v0.0.2 * Adapt swh.journal.publisher * Adapt swh.journal.client * Add swh.journal.checker basic implementation (reads and sends all * objects to publisher's subscribed queues). -- Antoine R. Dumont (@ardumont) Fri, 24 Mar 2017 12:54:16 +0100 swh-journal (0.0.1-1~swh1) unstable-swh; urgency=medium * Initial release * v0.0.1 * Add a journal publisher * Add a base class interface for journal clients -- Antoine R. Dumont (@ardumont) Tue, 21 Mar 2017 14:38:13 +0100 diff --git a/requirements-swh.txt b/requirements-swh.txt index 003cbc1..a11813c 100644 --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,2 +1,2 @@ swh.core[db,http] >= 0.0.60 -swh.storage >= 0.0.141 +swh.storage >= 0.0.147 diff --git a/swh.journal.egg-info/PKG-INFO b/swh.journal.egg-info/PKG-INFO index 45b0bb3..bab4758 100644 --- a/swh.journal.egg-info/PKG-INFO +++ b/swh.journal.egg-info/PKG-INFO @@ -1,70 +1,70 @@ Metadata-Version: 2.1 Name: swh.journal -Version: 0.0.13 +Version: 0.0.14 Summary: Software Heritage Journal utilities Home-page: https://forge.softwareheritage.org/diffusion/DJNL/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN -Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Source, https://forge.softwareheritage.org/source/swh-journal +Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Description: swh-journal =========== Persistent logger of changes to the archive, with publish-subscribe support. See the [documentation](https://docs.softwareheritage.org/devel/swh-journal/index.html#software-heritage-journal) for more details. # Local test As a pre-requisite, you need a kakfa installation path. The following target will take care of this: ``` make install ``` Then, provided you are in the right virtual environment as described in the [swh getting-started](https://docs.softwareheritage.org/devel/developer-setup.html#developer-setup): ``` pytest ``` or: ``` tox ``` # Running ## publisher Command: ``` $ swh-journal --config-file ~/.config/swh/journal/publisher.yml \ publisher ``` # Auto-completion To have the completion, add the following in your ~/.virtualenvs/swh/bin/postactivate: ``` eval "$(_SWH_JOURNAL_COMPLETE=$autocomplete_cmd swh-journal)" ``` Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Description-Content-Type: text/markdown Provides-Extra: testing diff --git a/swh.journal.egg-info/SOURCES.txt b/swh.journal.egg-info/SOURCES.txt index 227939f..5b07d12 100644 --- a/swh.journal.egg-info/SOURCES.txt +++ b/swh.journal.egg-info/SOURCES.txt @@ -1,29 +1,30 @@ MANIFEST.in Makefile README.md requirements-swh.txt requirements.txt setup.py version.txt swh/__init__.py swh.journal.egg-info/PKG-INFO swh.journal.egg-info/SOURCES.txt swh.journal.egg-info/dependency_links.txt swh.journal.egg-info/entry_points.txt swh.journal.egg-info/requires.txt swh.journal.egg-info/top_level.txt swh/journal/__init__.py swh/journal/backfill.py swh/journal/cli.py swh/journal/client.py swh/journal/direct_writer.py swh/journal/replay.py swh/journal/serializers.py swh/journal/tests/__init__.py swh/journal/tests/conftest.py swh/journal/tests/test_backfill.py swh/journal/tests/test_cli.py swh/journal/tests/test_direct_writer.py swh/journal/tests/test_replay.py swh/journal/tests/test_serializers.py -swh/journal/tests/test_write_replay.py \ No newline at end of file +swh/journal/tests/test_write_replay.py +swh/journal/tests/utils.py \ No newline at end of file diff --git a/swh.journal.egg-info/requires.txt b/swh.journal.egg-info/requires.txt index be56d97..0cad31a 100644 --- a/swh.journal.egg-info/requires.txt +++ b/swh.journal.egg-info/requires.txt @@ -1,11 +1,11 @@ kafka-python>=1.3 msgpack vcversioner swh.core[db,http]>=0.0.60 -swh.storage>=0.0.141 +swh.storage>=0.0.147 [testing] pytest swh.model>=0.0.34 pytest-kafka hypothesis diff --git a/swh/journal/backfill.py b/swh/journal/backfill.py index 0c53222..eea5ed7 100644 --- a/swh/journal/backfill.py +++ b/swh/journal/backfill.py @@ -1,453 +1,453 @@ # Copyright (C) 2017-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """Module defining journal backfiller classes. Those backfiller goal is to produce back part or all of the objects from the storage to the journal topics At the moment, a first naive implementation is the JournalBackfiller. It simply reads the objects from the storage and sends every object identifier back to the journal. """ import logging from .direct_writer import DirectKafkaWriter from swh.core.db import BaseDb from swh.storage.converters import db_to_release, db_to_revision logger = logging.getLogger(__name__) PARTITION_KEY = { 'content': 'sha1', 'skipped_content': None, # unused 'directory': 'id', 'revision': 'revision.id', 'release': 'release.id', 'snapshot': 'id', 'origin': 'id', 'origin_visit': 'origin_visit.origin', } COLUMNS = { 'content': [ 'sha1', 'sha1_git', 'sha256', 'blake2s256', 'length', 'status', 'ctime' ], 'skipped_content': [ 'sha1', 'sha1_git', 'sha256', 'blake2s256', 'length', 'ctime', 'status', 'reason', ], 'directory': ['id', 'dir_entries', 'file_entries', 'rev_entries'], 'revision': [ ("revision.id", "id"), "date", "date_offset", "committer_date", "committer_date_offset", "type", "directory", "message", "synthetic", "metadata", "date_neg_utc_offset", "committer_date_neg_utc_offset", ("array(select parent_id::bytea from revision_history rh " "where rh.id = revision.id order by rh.parent_rank asc)", "parents"), ("a.id", "author_id"), ("a.name", "author_name"), ("a.email", "author_email"), ("a.fullname", "author_fullname"), ("c.id", "committer_id"), ("c.name", "committer_name"), ("c.email", "committer_email"), ("c.fullname", "committer_fullname"), ], 'release': [ ("release.id", "id"), "date", "date_offset", "comment", ("release.name", "name"), "synthetic", "date_neg_utc_offset", "target", "target_type", ("a.id", "author_id"), ("a.name", "author_name"), ("a.email", "author_email"), ("a.fullname", "author_fullname"), ], 'snapshot': ['id', 'object_id'], 'origin': ['type', 'url'], - 'origin_visit': ['type', 'url', 'date', 'snapshot', 'status'], + 'origin_visit': ['visit', 'type', 'url', 'date', 'snapshot', 'status'], } JOINS = { 'release': ['person a on release.author=a.id'], 'revision': ['person a on revision.author=a.id', 'person c on revision.committer=c.id'], 'origin_visit': ['origin on origin_visit.origin=origin.id'], } def directory_converter(db, directory): """Convert directory from the flat representation to swh model compatible objects. """ columns = ['target', 'name', 'perms'] query_template = ''' select %(columns)s from directory_entry_%(type)s where id in %%s ''' types = ['file', 'dir', 'rev'] entries = [] with db.cursor() as cur: for type in types: ids = directory.pop('%s_entries' % type) if not ids: continue query = query_template % { 'columns': ','.join(columns), 'type': type, } cur.execute(query, (tuple(ids), )) for row in cur: entry = dict(zip(columns, row)) entry['type'] = type entries.append(entry) directory['entries'] = entries return directory def revision_converter(db, revision): """Convert revision from the flat representation to swh model compatible objects. """ revision = db_to_revision(revision) if 'author' in revision and revision['author']: del revision['author']['id'] if 'committer' in revision and revision['committer']: del revision['committer']['id'] return revision def release_converter(db, release): """Convert release from the flat representation to swh model compatible objects. """ release = db_to_release(release) if 'author' in release and release['author']: del release['author']['id'] return release def snapshot_converter(db, snapshot): """Convert snapshot from the flat representation to swh model compatible objects. """ columns = ['name', 'target', 'target_type'] query = ''' select %s from snapshot_branches sbs inner join snapshot_branch sb on sb.object_id=sbs.branch_id where sbs.snapshot_id=%%s ''' % ', '.join(columns) with db.cursor() as cur: cur.execute(query, (snapshot.pop('object_id'), )) branches = {} for name, *row in cur: branch = dict(zip(columns[1:], row)) if not branch['target'] and not branch['target_type']: branch = None branches[name] = branch snapshot['branches'] = branches return snapshot def origin_visit_converter(db, origin_visit): origin = { 'type': origin_visit.pop('type'), 'url': origin_visit.pop('url'), } origin_visit['origin'] = origin return origin_visit CONVERTERS = { 'directory': directory_converter, 'revision': revision_converter, 'release': release_converter, 'snapshot': snapshot_converter, 'origin_visit': origin_visit_converter, } def object_to_offset(object_id, numbits): """Compute the index of the range containing object id, when dividing space into 2^numbits. Args: object_id (str): The hex representation of object_id numbits (int): Number of bits in which we divide input space Returns: The index of the range containing object id """ q, r = divmod(numbits, 8) length = q + (r != 0) shift_bits = 8 - r if r else 0 truncated_id = object_id[:length * 2] if len(truncated_id) < length * 2: truncated_id += '0' * (length * 2 - len(truncated_id)) truncated_id_bytes = bytes.fromhex(truncated_id) return int.from_bytes(truncated_id_bytes, byteorder='big') >> shift_bits def byte_ranges(numbits, start_object=None, end_object=None): """Generate start/end pairs of bytes spanning numbits bits and constrained by optional start_object and end_object. Args: numbits (int): Number of bits in which we divide input space start_object (str): Hex object id contained in the first range returned end_object (str): Hex object id contained in the last range returned Yields: 2^numbits pairs of bytes """ q, r = divmod(numbits, 8) length = q + (r != 0) shift_bits = 8 - r if r else 0 def to_bytes(i): return int.to_bytes(i << shift_bits, length=length, byteorder='big') start_offset = 0 end_offset = 1 << numbits if start_object is not None: start_offset = object_to_offset(start_object, numbits) if end_object is not None: end_offset = object_to_offset(end_object, numbits) + 1 for start in range(start_offset, end_offset): end = start + 1 if start == 0: yield None, to_bytes(end) elif end == 1 << numbits: yield to_bytes(start), None else: yield to_bytes(start), to_bytes(end) def integer_ranges(start, end, block_size=1000): for start in range(start, end, block_size): if start == 0: yield None, block_size elif start + block_size > end: yield start, end else: yield start, start + block_size RANGE_GENERATORS = { 'content': lambda start, end: byte_ranges(24, start, end), 'skipped_content': lambda start, end: [(None, None)], 'directory': lambda start, end: byte_ranges(24, start, end), 'revision': lambda start, end: byte_ranges(24, start, end), 'release': lambda start, end: byte_ranges(16, start, end), 'snapshot': lambda start, end: byte_ranges(16, start, end), 'origin': integer_ranges, 'origin_visit': integer_ranges, } def compute_query(obj_type, start, end): columns = COLUMNS.get(obj_type) join_specs = JOINS.get(obj_type, []) join_clause = '\n'.join('left join %s' % clause for clause in join_specs) where = [] where_args = [] if start: where.append('%(keys)s >= %%s') where_args.append(start) if end: where.append('%(keys)s < %%s') where_args.append(end) where_clause = '' if where: where_clause = ('where ' + ' and '.join(where)) % { 'keys': '(%s)' % PARTITION_KEY[obj_type] } column_specs = [] column_aliases = [] for column in columns: if isinstance(column, str): column_specs.append(column) column_aliases.append(column) else: column_specs.append('%s as %s' % column) column_aliases.append(column[1]) query = ''' select %(columns)s from %(table)s %(join)s %(where)s ''' % { 'columns': ','.join(column_specs), 'table': obj_type, 'join': join_clause, 'where': where_clause, } return query, where_args, column_aliases def fetch(db, obj_type, start, end): """Fetch all obj_type's identifiers from db. This opens one connection, stream objects and when done, close the connection. Args: db (BaseDb): Db connection object obj_type (str): Object type start (Union[bytes|Tuple]): Range start identifier end (Union[bytes|Tuple]): Range end identifier Raises: ValueError if obj_type is not supported Yields: Objects in the given range """ query, where_args, column_aliases = compute_query(obj_type, start, end) converter = CONVERTERS.get(obj_type) with db.cursor() as cursor: logger.debug('Fetching data for table %s', obj_type) logger.debug('query: %s %s', query, where_args) cursor.execute(query, where_args) for row in cursor: record = dict(zip(column_aliases, row)) if converter: record = converter(db, record) logger.debug('record: %s' % record) yield record MANDATORY_KEYS = ['brokers', 'storage_dbconn', 'prefix', 'client_id'] class JournalBackfiller: """Class in charge of reading the storage's objects and sends those back to the journal's topics. This is designed to be run periodically. """ def __init__(self, config=None): self.config = config self.check_config(config) def check_config(self, config): missing_keys = [] for key in MANDATORY_KEYS: if not config.get(key): missing_keys.append(key) if missing_keys: raise ValueError( 'Configuration error: The following keys must be' ' provided: %s' % (','.join(missing_keys), )) def parse_arguments(self, object_type, start_object, end_object): """Parse arguments Raises: ValueError for unsupported object type ValueError if object ids are not parseable Returns: Parsed start and end object ids """ if object_type not in COLUMNS: raise ValueError('Object type %s is not supported. ' 'The only possible values are %s' % ( object_type, ', '.join(COLUMNS.keys()))) if object_type in ['origin', 'origin_visit']: if start_object: start_object = int(start_object) else: start_object = 0 if end_object: end_object = int(end_object) else: end_object = 100 * 1000 * 1000 # hard-coded limit return start_object, end_object def run(self, object_type, start_object, end_object, dry_run=False): """Reads storage's subscribed object types and send them to the journal's reading topic. """ start_object, end_object = self.parse_arguments( object_type, start_object, end_object) db = BaseDb.connect(self.config['storage_dbconn']) writer = DirectKafkaWriter( brokers=self.config['brokers'], prefix=self.config['prefix'], client_id=self.config['client_id'] ) for range_start, range_end in RANGE_GENERATORS[object_type]( start_object, end_object): logger.info('Processing %s range %s to %s', object_type, range_start, range_end) for obj in fetch( db, object_type, start=range_start, end=range_end, ): if dry_run: continue writer.write_addition(object_type=object_type, object_=obj) writer.producer.flush() if __name__ == '__main__': print('Please use the "swh-journal backfiller run" command') diff --git a/swh/journal/cli.py b/swh/journal/cli.py index 7bfe982..ce11024 100644 --- a/swh/journal/cli.py +++ b/swh/journal/cli.py @@ -1,202 +1,202 @@ # Copyright (C) 2016-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import click import functools import logging import os from swh.core import config from swh.core.cli import CONTEXT_SETTINGS from swh.storage import get_storage from swh.objstorage import get_objstorage from swh.journal.client import JournalClient from swh.journal.replay import process_replay_objects from swh.journal.replay import process_replay_objects_content from swh.journal.backfill import JournalBackfiller @click.group(name='journal', context_settings=CONTEXT_SETTINGS) @click.option('--config-file', '-C', default=None, type=click.Path(exists=True, dir_okay=False,), help="Configuration file.") @click.pass_context def cli(ctx, config_file): """Software Heritage Journal tools. The journal is a persistent logger of changes to the archive, with publish-subscribe support. """ if not config_file: config_file = os.environ.get('SWH_CONFIG_FILENAME') if config_file: if not os.path.exists(config_file): raise ValueError('%s does not exist' % config_file) conf = config.read(config_file) else: conf = {} ctx.ensure_object(dict) log_level = ctx.obj.get('log_level', logging.INFO) logging.root.setLevel(log_level) logging.getLogger('kafka').setLevel(logging.INFO) ctx.obj['config'] = conf def get_journal_client(ctx, **kwargs): conf = ctx.obj['config'].get('journal', {}) conf.update({k: v for (k, v) in kwargs.items() if v not in (None, ())}) if not conf.get('brokers'): ctx.fail('You must specify at least one kafka broker.') if not isinstance(conf['brokers'], (list, tuple)): conf['brokers'] = [conf['brokers']] return JournalClient(**conf) @cli.command() @click.option('--max-messages', '-m', default=None, type=int, help='Maximum number of objects to replay. Default is to ' 'run forever.') @click.option('--broker', 'brokers', type=str, multiple=True, help='Kafka broker to connect to. ' '(deprecated, use the config file instead)') @click.option('--prefix', type=str, default=None, help='Prefix of Kafka topic names to read from. ' '(deprecated, use the config file instead)') -@click.option('--group-id', '--consumer-id', type=str, - help='Name of the consumer/group id for reading from Kafka. ' +@click.option('--group-id', type=str, + help='Name of the group id for reading from Kafka. ' '(deprecated, use the config file instead)') @click.pass_context def replay(ctx, brokers, prefix, group_id, max_messages): """Fill a Storage by reading a Journal. There can be several 'replayers' filling a Storage as long as they use the same `group-id`. """ logger = logging.getLogger(__name__) conf = ctx.obj['config'] try: storage = get_storage(**conf.pop('storage')) except KeyError: ctx.fail('You must have a storage configured in your config file.') client = get_journal_client( ctx, brokers=brokers, prefix=prefix, group_id=group_id) worker_fn = functools.partial(process_replay_objects, storage=storage) try: nb_messages = 0 while not max_messages or nb_messages < max_messages: nb_messages += client.process(worker_fn) logger.info('Processed %d messages.' % nb_messages) except KeyboardInterrupt: ctx.exit(0) else: print('Done.') @cli.command() @click.argument('object_type') @click.option('--start-object', default=None) @click.option('--end-object', default=None) @click.option('--dry-run', is_flag=True, default=False) @click.pass_context def backfiller(ctx, object_type, start_object, end_object, dry_run): """Run the backfiller The backfiller list objects from a Storage and produce journal entries from there. Typically used to rebuild a journal or compensate for missing objects in a journal (eg. due to a downtime of this later). The configuration file requires the following entries: - brokers: a list of kafka endpoints (the journal) in which entries will be added. - storage_dbconn: URL to connect to the storage DB. - prefix: the prefix of the topics (topics will be .). - client_id: the kafka client ID. """ conf = ctx.obj['config'] backfiller = JournalBackfiller(conf) try: backfiller.run( object_type=object_type, start_object=start_object, end_object=end_object, dry_run=dry_run) except KeyboardInterrupt: ctx.exit(0) @cli.command() @click.option('--concurrency', type=int, default=8, help='Concurrentcy level.') @click.option('--broker', 'brokers', type=str, multiple=True, help='Kafka broker to connect to.' '(deprecated, use the config file instead)') @click.option('--prefix', type=str, default=None, help='Prefix of Kafka topic names to read from.' '(deprecated, use the config file instead)') -@click.option('--group-id', '--consumer-id', type=str, - help='Name of the consumer/group id for reading from Kafka.' +@click.option('--group-id', type=str, + help='Name of the group id for reading from Kafka.' '(deprecated, use the config file instead)') @click.pass_context def content_replay(ctx, concurrency, brokers, prefix, group_id): """Fill a destination Object Storage (typically a mirror) by reading a Journal and retrieving objects from an existing source ObjStorage. There can be several 'replayers' filling a given ObjStorage as long as they use the same `group-id`. This service retrieves object ids to copy from the 'content' topic. It will only copy object's content if the object's description in the kafka nmessage has the status:visible set. """ logger = logging.getLogger(__name__) conf = ctx.obj['config'] try: objstorage_src = get_objstorage(**conf.pop('objstorage_src')) except KeyError: ctx.fail('You must have a source objstorage configured in ' 'your config file.') try: objstorage_dst = get_objstorage(**conf.pop('objstorage_dst')) except KeyError: ctx.fail('You must have a destination objstorage configured ' 'in your config file.') client = get_journal_client( ctx, brokers=brokers, prefix=prefix, group_id=group_id, object_types=('content',)) worker_fn = functools.partial(process_replay_objects_content, src=objstorage_src, dst=objstorage_dst, concurrency=concurrency) try: nb_messages = 0 while True: nb_messages += client.process(worker_fn) logger.info('Processed %d messages.' % nb_messages) except KeyboardInterrupt: ctx.exit(0) else: logger.info('Done.') def main(): logging.basicConfig() return cli(auto_envvar_prefix='SWH_JOURNAL') if __name__ == '__main__': main() diff --git a/swh/journal/client.py b/swh/journal/client.py index 56c6519..473fb0e 100644 --- a/swh/journal/client.py +++ b/swh/journal/client.py @@ -1,108 +1,109 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from kafka import KafkaConsumer import logging +from kafka import KafkaConsumer + from .serializers import kafka_to_key, kafka_to_value from swh.journal import DEFAULT_PREFIX logger = logging.getLogger(__name__) # Only accepted offset reset policy accepted ACCEPTED_OFFSET_RESET = ['earliest', 'latest'] # Only accepted object types ACCEPTED_OBJECT_TYPES = [ 'content', 'directory', 'revision', 'release', 'snapshot', 'origin', 'origin_visit' ] class JournalClient: """A base client for the Software Heritage journal. The current implementation of the journal uses Apache Kafka brokers to publish messages under a given topic prefix, with each object type using a specific topic under that prefix. If the 'prefix' argument is None (default value), it will take the default value 'swh.journal.objects'. Clients subscribe to events specific to each object type as listed in the `object_types` argument (if unset, defaults to all accepted objet types). Clients can be sharded by setting the `group_id` to a common value across instances. The journal will share the message throughput across the nodes sharing the same group_id. Messages are processed by the `process_objects` method in batches of maximum `max_messages`. Any other named argument is passed directly to KafkaConsumer(). """ def __init__( self, brokers, group_id, prefix=None, object_types=None, max_messages=0, auto_offset_reset='earliest', **kwargs): if prefix is None: prefix = DEFAULT_PREFIX if object_types is None: object_types = ACCEPTED_OBJECT_TYPES if auto_offset_reset not in ACCEPTED_OFFSET_RESET: raise ValueError( 'Option \'auto_offset_reset\' only accept %s.' % ACCEPTED_OFFSET_RESET) for object_type in object_types: if object_type not in ACCEPTED_OBJECT_TYPES: raise ValueError( 'Option \'object_types\' only accepts %s.' % ACCEPTED_OFFSET_RESET) self.consumer = KafkaConsumer( bootstrap_servers=brokers, key_deserializer=kafka_to_key, value_deserializer=kafka_to_value, auto_offset_reset=auto_offset_reset, enable_auto_commit=False, group_id=group_id, **kwargs) self.consumer.subscribe( topics=['%s.%s' % (prefix, object_type) for object_type in object_types], ) self.max_messages = max_messages self._object_types = object_types def process(self, worker_fn): """Polls Kafka for a batch of messages, and calls the worker_fn with these messages. Args: worker_fn Callable[Dict[str, List[dict]]]: Function called with the messages as argument. """ nb_messages = 0 polled = self.consumer.poll() for (partition, messages) in polled.items(): object_type = partition.topic.split('.')[-1] # Got a message from a topic we did not subscribe to. assert object_type in self._object_types, object_type worker_fn({object_type: [msg.value for msg in messages]}) nb_messages += len(messages) self.consumer.commit() return nb_messages diff --git a/swh/journal/replay.py b/swh/journal/replay.py index f5cc762..bc44939 100644 --- a/swh/journal/replay.py +++ b/swh/journal/replay.py @@ -1,90 +1,99 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from time import time import logging from concurrent.futures import ThreadPoolExecutor from swh.storage import HashCollision from swh.model.hashutil import hash_to_hex from swh.objstorage.objstorage import ID_HASH_ALGO from swh.core.statsd import statsd logger = logging.getLogger(__name__) def process_replay_objects(all_objects, *, storage): for (object_type, objects) in all_objects.items(): _insert_objects(object_type, objects, storage) def _insert_objects(object_type, objects, storage): if object_type == 'content': # TODO: insert 'content' in batches for object_ in objects: try: storage.content_add_metadata([object_]) except HashCollision as e: logger.error('Hash collision: %s', e.args) elif object_type in ('directory', 'revision', 'release', 'snapshot', 'origin'): # TODO: split batches that are too large for the storage # to handle? method = getattr(storage, object_type + '_add') method(objects) elif object_type == 'origin_visit': - storage.origin_visit_upsert([ - { - **obj, - 'origin': storage.origin_add_one(obj['origin']) - } - for obj in objects]) + for visit in objects: + if isinstance(visit['origin'], str): + # old format; note that it will crash with the pg and + # in-mem storages if the origin is not already known, + # but there is no other choice because we can't add an + # origin without knowing its type. Non-pg storages + # don't use a numeric FK internally, + visit['origin'] = {'url': visit['origin']} + else: + storage.origin_add_one(visit['origin']) + if 'type' not in visit: + # old format + visit['type'] = visit['origin']['type'] + + storage.origin_visit_upsert(objects) else: logger.warning('Received a series of %s, this should not happen', object_type) def copy_object(obj_id, src, dst): statsd_name = 'swh_journal_content_replayer_%s_duration_seconds' try: with statsd.timed(statsd_name % 'get'): obj = src.get(obj_id) with statsd.timed(statsd_name % 'put'): dst.add(obj, obj_id=obj_id, check_presence=False) logger.debug('copied %s', hash_to_hex(obj_id)) statsd.increment( 'swh_journal_content_replayer_bytes_total', len(obj)) except Exception: obj = '' logger.exception('Failed to copy %s', hash_to_hex(obj_id)) return len(obj) def process_replay_objects_content(all_objects, *, src, dst, concurrency=8): vol = [] t0 = time() with ThreadPoolExecutor(max_workers=concurrency) as executor: for (object_type, objects) in all_objects.items(): if object_type != 'content': logger.warning( 'Received a series of %s, this should not happen', object_type) continue for obj in objects: obj_id = obj[ID_HASH_ALGO] if obj['status'] == 'visible': fut = executor.submit(copy_object, obj_id, src, dst) fut.add_done_callback(lambda fn: vol.append(fn.result())) else: logger.debug('skipped %s (%s)', hash_to_hex(obj_id), obj['status']) dt = time() - t0 logger.info( 'processed %s content objects in %.1fsec ' '(%.1f obj/sec, %.1fMB/sec) - %s failures', len(vol), dt, len(vol)/dt, sum(vol)/1024/1024/dt, len([x for x in vol if not x])) diff --git a/swh/journal/tests/test_backfill.py b/swh/journal/tests/test_backfill.py index a604cbf..0f39461 100644 --- a/swh/journal/tests/test_backfill.py +++ b/swh/journal/tests/test_backfill.py @@ -1,123 +1,123 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import pytest from swh.journal.backfill import ( JournalBackfiller, compute_query, PARTITION_KEY ) TEST_CONFIG = { 'brokers': ['localhost'], 'prefix': 'swh.tmp_journal.new', 'client_id': 'swh.journal.client.test', 'storage_dbconn': 'service=swh-dev', } def test_config_ko_missing_mandatory_key(): """Missing configuration key will make the initialization fail """ for key in TEST_CONFIG.keys(): config = TEST_CONFIG.copy() config.pop(key) with pytest.raises(ValueError) as e: JournalBackfiller(config) error = ('Configuration error: The following keys must be' ' provided: %s' % (','.join([key]), )) assert e.value.args[0] == error def test_config_ko_unknown_object_type(): """Parse arguments will fail if the object type is unknown """ backfiller = JournalBackfiller(TEST_CONFIG) with pytest.raises(ValueError) as e: backfiller.parse_arguments('unknown-object-type', 1, 2) error = ('Object type unknown-object-type is not supported. ' 'The only possible values are %s' % ( ', '.join(PARTITION_KEY))) assert e.value.args[0] == error def test_compute_query_content(): query, where_args, column_aliases = compute_query( 'content', '\x000000', '\x000001') assert where_args == ['\x000000', '\x000001'] assert column_aliases == [ 'sha1', 'sha1_git', 'sha256', 'blake2s256', 'length', 'status', 'ctime' ] assert query == ''' select sha1,sha1_git,sha256,blake2s256,length,status,ctime from content where (sha1) >= %s and (sha1) < %s ''' def test_compute_query_skipped_content(): query, where_args, column_aliases = compute_query( 'skipped_content', None, None) assert where_args == [] assert column_aliases == [ 'sha1', 'sha1_git', 'sha256', 'blake2s256', 'length', 'ctime', 'status', 'reason', ] assert query == ''' select sha1,sha1_git,sha256,blake2s256,length,ctime,status,reason from skipped_content ''' def test_compute_query_origin_visit(): query, where_args, column_aliases = compute_query( 'origin_visit', 1, 10) assert where_args == [1, 10] assert column_aliases == [ - 'type', 'url', 'date', 'snapshot', 'status' + 'visit', 'type', 'url', 'date', 'snapshot', 'status' ] assert query == ''' -select type,url,date,snapshot,status +select visit,type,url,date,snapshot,status from origin_visit left join origin on origin_visit.origin=origin.id where (origin_visit.origin) >= %s and (origin_visit.origin) < %s ''' def test_compute_query_release(): query, where_args, column_aliases = compute_query( 'release', '\x000002', '\x000003') assert where_args == ['\x000002', '\x000003'] assert column_aliases == [ 'id', 'date', 'date_offset', 'comment', 'name', 'synthetic', 'date_neg_utc_offset', 'target', 'target_type', 'author_id', 'author_name', 'author_email', 'author_fullname'] assert query == ''' select release.id as id,date,date_offset,comment,release.name as name,synthetic,date_neg_utc_offset,target,target_type,a.id as author_id,a.name as author_name,a.email as author_email,a.fullname as author_fullname from release left join person a on release.author=a.id where (release.id) >= %s and (release.id) < %s ''' # noqa diff --git a/swh/journal/tests/test_replay.py b/swh/journal/tests/test_replay.py index 9499c99..c4ab9c5 100644 --- a/swh/journal/tests/test_replay.py +++ b/swh/journal/tests/test_replay.py @@ -1,102 +1,198 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import functools import random from subprocess import Popen from typing import Tuple import dateutil from kafka import KafkaProducer from swh.storage import get_storage +from swh.storage.in_memory import ENABLE_ORIGIN_IDS from swh.journal.client import JournalClient from swh.journal.serializers import key_to_kafka, value_to_kafka from swh.journal.replay import process_replay_objects from .conftest import OBJECT_TYPE_KEYS +from .utils import MockedJournalClient, MockedKafkaWriter def test_storage_play( kafka_prefix: str, kafka_server: Tuple[Popen, int]): (_, port) = kafka_server kafka_prefix += '.swh.journal.objects' storage = get_storage('memory', {}) producer = KafkaProducer( bootstrap_servers='localhost:{}'.format(port), key_serializer=key_to_kafka, value_serializer=value_to_kafka, client_id='test producer', ) now = datetime.datetime.now(tz=datetime.timezone.utc) # Fill Kafka nb_sent = 0 nb_visits = 0 for (object_type, (_, objects)) in OBJECT_TYPE_KEYS.items(): topic = kafka_prefix + '.' + object_type for object_ in objects: key = bytes(random.randint(0, 255) for _ in range(40)) object_ = object_.copy() if object_type == 'content': object_['ctime'] = now elif object_type == 'origin_visit': nb_visits += 1 object_['visit'] = nb_visits producer.send(topic, key=key, value=object_) nb_sent += 1 # Fill the storage from Kafka config = { 'brokers': 'localhost:%d' % kafka_server[1], 'group_id': 'replayer', 'prefix': kafka_prefix, 'max_messages': nb_sent, } replayer = JournalClient(**config) worker_fn = functools.partial(process_replay_objects, storage=storage) nb_inserted = 0 while nb_inserted < nb_sent: nb_inserted += replayer.process(worker_fn) assert nb_sent == nb_inserted # Check the objects were actually inserted in the storage assert OBJECT_TYPE_KEYS['revision'][1] == \ list(storage.revision_get( [rev['id'] for rev in OBJECT_TYPE_KEYS['revision'][1]])) assert OBJECT_TYPE_KEYS['release'][1] == \ list(storage.release_get( [rel['id'] for rel in OBJECT_TYPE_KEYS['release'][1]])) origins = list(storage.origin_get( [orig for orig in OBJECT_TYPE_KEYS['origin'][1]])) assert OBJECT_TYPE_KEYS['origin'][1] == \ [{'url': orig['url'], 'type': orig['type']} for orig in origins] for origin in origins: expected_visits = [ { **visit, 'origin': origin['id'], 'date': dateutil.parser.parse(visit['date']), } for visit in OBJECT_TYPE_KEYS['origin_visit'][1] if visit['origin']['url'] == origin['url'] and visit['origin']['type'] == origin['type'] ] actual_visits = list(storage.origin_visit_get(origin['id'])) for visit in actual_visits: del visit['visit'] # opaque identifier assert expected_visits == actual_visits contents = list(storage.content_get_metadata( [cont['sha1'] for cont in OBJECT_TYPE_KEYS['content'][1]])) assert None not in contents assert contents == OBJECT_TYPE_KEYS['content'][1] + + +def test_write_replay_legacy_origin_visit1(): + """Test origin_visit when the 'origin' is just a string.""" + queue = [] + replayer = MockedJournalClient(queue) + writer = MockedKafkaWriter(queue) + + # Note that flipping the order of these two insertions will crash + # the test, because the legacy origin_format does not allow to create + # the origin when needed (type is missing) + now = datetime.datetime.now() + writer.send('origin', 'foo', { + 'url': 'http://example.com/', + 'type': 'git', + }) + writer.send('origin_visit', 'foo', { + 'visit': 1, + 'origin': 'http://example.com/', + 'date': now, + }) + + queue_size = sum(len(partition) + for batch in queue + for partition in batch.values()) + + storage = get_storage('memory', {}) + worker_fn = functools.partial(process_replay_objects, storage=storage) + nb_messages = 0 + while nb_messages < queue_size: + nb_messages += replayer.process(worker_fn) + + visits = list(storage.origin_visit_get('http://example.com/')) + + if ENABLE_ORIGIN_IDS: + assert visits == [{ + 'visit': 1, + 'origin': 1, + 'date': now, + }] + else: + assert visits == [{ + 'visit': 1, + 'origin': {'url': 'http://example.com/'}, + 'date': now, + }] + + +def test_write_replay_legacy_origin_visit2(): + """Test origin_visit when 'type' is missing.""" + queue = [] + replayer = MockedJournalClient(queue) + writer = MockedKafkaWriter(queue) + + now = datetime.datetime.now() + writer.send('origin', 'foo', { + 'url': 'http://example.com/', + 'type': 'git', + }) + writer.send('origin_visit', 'foo', { + 'visit': 1, + 'origin': { + 'url': 'http://example.com/', + 'type': 'git', + }, + 'date': now, + }) + + queue_size = sum(len(partition) + for batch in queue + for partition in batch.values()) + + storage = get_storage('memory', {}) + worker_fn = functools.partial(process_replay_objects, storage=storage) + nb_messages = 0 + while nb_messages < queue_size: + nb_messages += replayer.process(worker_fn) + + visits = list(storage.origin_visit_get('http://example.com/')) + + if ENABLE_ORIGIN_IDS: + assert visits == [{ + 'visit': 1, + 'origin': 1, + 'date': now, + 'type': 'git', + }] + else: + assert visits == [{ + 'visit': 1, + 'origin': {'url': 'http://example.com/'}, + 'date': now, + 'type': 'git', + }] diff --git a/swh/journal/tests/test_write_replay.py b/swh/journal/tests/test_write_replay.py index 86e5e49..caee858 100644 --- a/swh/journal/tests/test_write_replay.py +++ b/swh/journal/tests/test_write_replay.py @@ -1,133 +1,91 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from collections import namedtuple import functools from hypothesis import given, settings, HealthCheck from hypothesis.strategies import lists from swh.model.hypothesis_strategies import object_dicts from swh.storage.in_memory import Storage from swh.storage import HashCollision -from swh.journal.client import JournalClient, ACCEPTED_OBJECT_TYPES -from swh.journal.direct_writer import DirectKafkaWriter from swh.journal.replay import process_replay_objects from swh.journal.replay import process_replay_objects_content -from swh.journal.serializers import ( - key_to_kafka, kafka_to_key, value_to_kafka, kafka_to_value) - -FakeKafkaMessage = namedtuple('FakeKafkaMessage', 'key value') -FakeKafkaPartition = namedtuple('FakeKafkaPartition', 'topic') - - -class MockedKafkaWriter(DirectKafkaWriter): - def __init__(self, queue): - self._prefix = 'prefix' - self.queue = queue - - def send(self, topic, key, value): - key = kafka_to_key(key_to_kafka(key)) - value = kafka_to_value(value_to_kafka(value)) - partition = FakeKafkaPartition(topic) - msg = FakeKafkaMessage(key=key, value=value) - if self.queue and {partition} == set(self.queue[-1]): - # The last message is of the same object type, groupping them - self.queue[-1][partition].append(msg) - else: - self.queue.append({partition: [msg]}) - - -class MockedKafkaConsumer: - def __init__(self, queue): - self.queue = queue - self.committed = False - - def poll(self): - return self.queue.pop(0) - - def commit(self): - if self.queue == []: - self.committed = True - -class MockedJournalClient(JournalClient): - def __init__(self, queue, object_types=ACCEPTED_OBJECT_TYPES): - self._object_types = object_types - self.consumer = MockedKafkaConsumer(queue) +from .utils import MockedJournalClient, MockedKafkaWriter @given(lists(object_dicts(), min_size=1)) @settings(suppress_health_check=[HealthCheck.too_slow]) def test_write_replay_same_order_batches(objects): queue = [] replayer = MockedJournalClient(queue) storage1 = Storage() storage1.journal_writer = MockedKafkaWriter(queue) for (obj_type, obj) in objects: obj = obj.copy() if obj_type == 'origin_visit': origin_id = storage1.origin_add_one(obj.pop('origin')) if 'visit' in obj: del obj['visit'] storage1.origin_visit_add(origin_id, **obj) else: method = getattr(storage1, obj_type + '_add') try: method([obj]) except HashCollision: pass queue_size = sum(len(partition) for batch in queue for partition in batch.values()) storage2 = Storage() worker_fn = functools.partial(process_replay_objects, storage=storage2) nb_messages = 0 while nb_messages < queue_size: nb_messages += replayer.process(worker_fn) assert replayer.consumer.committed for attr_name in ('_contents', '_directories', '_revisions', '_releases', '_snapshots', '_origin_visits', '_origins'): assert getattr(storage1, attr_name) == getattr(storage2, attr_name), \ attr_name # TODO: add test for hash collision @given(lists(object_dicts(), min_size=1)) @settings(suppress_health_check=[HealthCheck.too_slow]) def test_write_replay_content(objects): queue = [] replayer = MockedJournalClient(queue) storage1 = Storage() storage1.journal_writer = MockedKafkaWriter(queue) for (obj_type, obj) in objects: obj = obj.copy() if obj_type == 'content': storage1.content_add([obj]) queue_size = sum(len(partition) for batch in queue for partition in batch.values()) storage2 = Storage() worker_fn = functools.partial(process_replay_objects_content, src=storage1.objstorage, dst=storage2.objstorage) nb_messages = 0 while nb_messages < queue_size: nb_messages += replayer.process(worker_fn) assert storage1.objstorage.state == storage2.objstorage.state diff --git a/swh/journal/tests/utils.py b/swh/journal/tests/utils.py new file mode 100644 index 0000000..4812607 --- /dev/null +++ b/swh/journal/tests/utils.py @@ -0,0 +1,45 @@ +from collections import namedtuple + +from swh.journal.client import JournalClient, ACCEPTED_OBJECT_TYPES +from swh.journal.direct_writer import DirectKafkaWriter +from swh.journal.serializers import ( + key_to_kafka, kafka_to_key, value_to_kafka, kafka_to_value) + +FakeKafkaMessage = namedtuple('FakeKafkaMessage', 'key value') +FakeKafkaPartition = namedtuple('FakeKafkaPartition', 'topic') + + +class MockedKafkaWriter(DirectKafkaWriter): + def __init__(self, queue): + self._prefix = 'prefix' + self.queue = queue + + def send(self, topic, key, value): + key = kafka_to_key(key_to_kafka(key)) + value = kafka_to_value(value_to_kafka(value)) + partition = FakeKafkaPartition(topic) + msg = FakeKafkaMessage(key=key, value=value) + if self.queue and {partition} == set(self.queue[-1]): + # The last message is of the same object type, groupping them + self.queue[-1][partition].append(msg) + else: + self.queue.append({partition: [msg]}) + + +class MockedKafkaConsumer: + def __init__(self, queue): + self.queue = queue + self.committed = False + + def poll(self): + return self.queue.pop(0) + + def commit(self): + if self.queue == []: + self.committed = True + + +class MockedJournalClient(JournalClient): + def __init__(self, queue, object_types=ACCEPTED_OBJECT_TYPES): + self._object_types = object_types + self.consumer = MockedKafkaConsumer(queue) diff --git a/version.txt b/version.txt index 946f3cd..c7985c7 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -v0.0.13-0-g687d06e \ No newline at end of file +v0.0.14-0-ge12d72a \ No newline at end of file