Page MenuHomeSoftware Heritage

D1540.id5136.diff
No OneTemporary

D1540.id5136.diff

diff --git a/setup.py b/setup.py
--- a/setup.py
+++ b/setup.py
@@ -48,6 +48,8 @@
entry_points='''
[console_scripts]
swh-journal=swh.journal.cli:main
+ [swh.cli.subcommands]
+ journal=swh.journal.cli:cli
''',
install_requires=parse_requirements() + parse_requirements('swh'),
setup_requires=['vcversioner'],
diff --git a/swh/journal/__init__.py b/swh/journal/__init__.py
--- a/swh/journal/__init__.py
+++ b/swh/journal/__init__.py
@@ -0,0 +1,7 @@
+# Copyright (C) 2019 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+# the default prefix for kafka's topics
+DEFAULT_PREFIX = 'swh.journal.objects'
diff --git a/swh/journal/backfill.py b/swh/journal/backfill.py
--- a/swh/journal/backfill.py
+++ b/swh/journal/backfill.py
@@ -366,7 +366,7 @@
yield record
-MANDATORY_KEYS = ['brokers', 'storage_dbconn', 'final_prefix', 'client_id']
+MANDATORY_KEYS = ['brokers', 'storage_dbconn', 'prefix', 'client_id']
class JournalBackfiller:
@@ -430,7 +430,7 @@
db = BaseDb.connect(self.config['storage_dbconn'])
writer = DirectKafkaWriter(
brokers=self.config['brokers'],
- prefix=self.config['final_prefix'],
+ prefix=self.config['prefix'],
client_id=self.config['client_id']
)
for range_start, range_end in RANGE_GENERATORS[object_type](
diff --git a/swh/journal/cli.py b/swh/journal/cli.py
--- a/swh/journal/cli.py
+++ b/swh/journal/cli.py
@@ -9,52 +9,43 @@
import os
from swh.core import config
+from swh.core.cli import CONTEXT_SETTINGS
from swh.storage import get_storage
from swh.journal.client import JournalClient
from swh.journal.replay import process_replay_objects
from swh.journal.backfill import JournalBackfiller
-CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help'])
-
-@click.group(context_settings=CONTEXT_SETTINGS)
+@click.group(name='journal', context_settings=CONTEXT_SETTINGS)
@click.option('--config-file', '-C', default=None,
type=click.Path(exists=True, dir_okay=False,),
help="Configuration file.")
-@click.option('--log-level', '-l', default='INFO',
- type=click.Choice(logging._nameToLevel.keys()),
- help="Log level (default to INFO)")
@click.pass_context
-def cli(ctx, config_file, log_level):
- """Software Heritage Scheduler CLI interface
+def cli(ctx, config_file):
+ """Software Heritage Journal tools.
- Default to use the the local scheduler instance (plugged to the
- main scheduler db).
+ The journal is a persistent logger of changes to the archive, with
+ publish-subscribe support.
"""
if not config_file:
config_file = os.environ.get('SWH_CONFIG_FILENAME')
- if not config_file:
- raise ValueError('You must either pass a config-file parameter '
- 'or set SWH_CONFIG_FILENAME to target '
- 'the config-file')
- if not os.path.exists(config_file):
- raise ValueError('%s does not exist' % config_file)
+ if config_file:
+ if not os.path.exists(config_file):
+ raise ValueError('%s does not exist' % config_file)
+ conf = config.read(config_file)
+ else:
+ conf = {}
- conf = config.read(config_file)
ctx.ensure_object(dict)
- logging.basicConfig(
- level=log_level,
- format='%(asctime)s %(levelname)s %(name)s %(message)s',
- )
-
+ log_level = ctx.obj.get('log_level', logging.INFO)
+ logging.root.setLevel(log_level)
logging.getLogger('kafka').setLevel(logging.INFO)
ctx.obj['config'] = conf
- ctx.obj['loglevel'] = log_level
@cli.command()
@@ -62,22 +53,44 @@
help='Maximum number of objects to replay. Default is to '
'run forever.')
@click.option('--broker', 'brokers', type=str, multiple=True,
+ hidden=True, # prefer config file
help='Kafka broker to connect to.')
-@click.option('--prefix', type=str, default='swh.journal.objects',
+@click.option('--prefix', type=str, default=None,
+ hidden=True, # prefer config file
help='Prefix of Kafka topic names to read from.')
-@click.option('--consumer-id', type=str,
+@click.option('--group-id', '--consumer-id', type=str,
+ hidden=True, # prefer config file
help='Name of the consumer/group id for reading from Kafka.')
@click.pass_context
-def replay(ctx, brokers, prefix, consumer_id, max_messages):
- """Fill a new storage by reading a journal.
+def replay(ctx, brokers, prefix, group_id, max_messages):
+ """Fill a Storage by reading a Journal.
+ There can be several 'replayers' filling a Storage as long as they use
+ the same `group-id`.
"""
- conf = ctx.obj['config']
logger = logging.getLogger(__name__)
- logger.setLevel(ctx.obj['loglevel'])
- storage = get_storage(**conf.pop('storage'))
- client = JournalClient(brokers, prefix, consumer_id)
+ conf = ctx.obj['config']
+ try:
+ storage = get_storage(**conf.pop('storage'))
+ except KeyError:
+ ctx.fail('You must have a storage configured in your config file.')
+
+ if brokers is None:
+ brokers = conf.get('journal', {}).get('brokers')
+ if not brokers:
+ ctx.fail('You must specify at least one kafka broker.')
+ if not isinstance(brokers, (list, tuple)):
+ brokers = [brokers]
+
+ if prefix is None:
+ prefix = conf.get('journal', {}).get('prefix')
+
+ if group_id is None:
+ group_id = conf.get('journal', {}).get('group_id')
+
+ client = JournalClient(brokers=brokers, group_id=group_id, prefix=prefix)
worker_fn = functools.partial(process_replay_objects, storage=storage)
+
try:
nb_messages = 0
while not max_messages or nb_messages < max_messages:
@@ -96,7 +109,20 @@
@click.option('--dry-run', is_flag=True, default=False)
@click.pass_context
def backfiller(ctx, object_type, start_object, end_object, dry_run):
- """Manipulate backfiller
+ """Run the backfiller
+
+ The backfiller list objects from a Storage and produce journal entries from
+ there.
+
+ Typically used to rebuild a journal or compensate for missing objects in a
+ journal (eg. due to a downtime of this later).
+
+ The configuration file requires the following entries:
+ - brokers: a list of kafka endpoints (the journal) in which entries will be
+ added.
+ - storage_dbconn: URL to connect to the storage DB.
+ - prefix: the prefix of the topics (topics will be <prefix>.<object_type>).
+ - client_id: the kafka client ID.
"""
conf = ctx.obj['config']
@@ -111,6 +137,7 @@
def main():
+ logging.basicConfig()
return cli(auto_envvar_prefix='SWH_JOURNAL')
diff --git a/swh/journal/client.py b/swh/journal/client.py
--- a/swh/journal/client.py
+++ b/swh/journal/client.py
@@ -7,7 +7,7 @@
import logging
from .serializers import kafka_to_key, kafka_to_value
-
+from swh.journal import DEFAULT_PREFIX
logger = logging.getLogger(__name__)
@@ -46,7 +46,7 @@
"""
def __init__(
- self, brokers, topic_prefix, consumer_id,
+ self, brokers, group_id, prefix=DEFAULT_PREFIX,
object_types=ACCEPTED_OBJECT_TYPES,
max_messages=0, auto_offset_reset='earliest'):
@@ -67,23 +67,17 @@
value_deserializer=kafka_to_value,
auto_offset_reset=auto_offset_reset,
enable_auto_commit=False,
- group_id=consumer_id,
+ group_id=group_id,
)
self.consumer.subscribe(
- topics=['%s.%s' % (topic_prefix, object_type)
+ topics=['%s.%s' % (prefix, object_type)
for object_type in object_types],
)
self.max_messages = max_messages
self._object_types = object_types
- def poll(self):
- return self.consumer.poll()
-
- def commit(self):
- self.consumer.commit()
-
def process(self, worker_fn):
"""Polls Kafka for a batch of messages, and calls the worker_fn
with these messages.
@@ -94,7 +88,7 @@
argument.
"""
nb_messages = 0
- polled = self.poll()
+ polled = self.consumer.poll()
for (partition, messages) in polled.items():
object_type = partition.topic.split('.')[-1]
# Got a message from a topic we did not subscribe to.
@@ -104,5 +98,5 @@
nb_messages += len(messages)
- self.commit()
+ self.consumer.commit()
return nb_messages
diff --git a/swh/journal/tests/conftest.py b/swh/journal/tests/conftest.py
--- a/swh/journal/tests/conftest.py
+++ b/swh/journal/tests/conftest.py
@@ -163,8 +163,6 @@
TEST_CONFIG = {
- 'temporary_prefix': 'swh.tmp_journal.new',
- 'final_prefix': 'swh.journal.objects',
'consumer_id': 'swh.journal.consumer',
'object_types': OBJECT_TYPE_KEYS.keys(),
'max_messages': 1, # will read 1 message and stops
@@ -182,8 +180,7 @@
return {
**TEST_CONFIG,
'brokers': ['localhost:{}'.format(port)],
- 'temporary_prefix': kafka_prefix + '.swh.tmp_journal.new',
- 'final_prefix': kafka_prefix + '.swh.journal.objects',
+ 'prefix': kafka_prefix + '.swh.journal.objects',
}
@@ -194,7 +191,7 @@
"""
kafka_topics = [
- '%s.%s' % (test_config['final_prefix'], object_type)
+ '%s.%s' % (test_config['prefix'], object_type)
for object_type in test_config['object_types']]
_, kafka_port = kafka_server
consumer = KafkaConsumer(
diff --git a/swh/journal/tests/test_backfill.py b/swh/journal/tests/test_backfill.py
--- a/swh/journal/tests/test_backfill.py
+++ b/swh/journal/tests/test_backfill.py
@@ -12,7 +12,7 @@
TEST_CONFIG = {
'brokers': ['localhost'],
- 'final_prefix': 'swh.tmp_journal.new',
+ 'prefix': 'swh.tmp_journal.new',
'client_id': 'swh.journal.client.test',
'storage_dbconn': 'service=swh-dev',
}
diff --git a/swh/journal/tests/test_cli.py b/swh/journal/tests/test_cli.py
--- a/swh/journal/tests/test_cli.py
+++ b/swh/journal/tests/test_cli.py
@@ -41,7 +41,7 @@
with tempfile.NamedTemporaryFile('a', suffix='.yml') as config_fd:
config_fd.write(CLI_CONFIG)
config_fd.seek(0)
- args = ['-C' + config_fd.name, '-l', 'DEBUG'] + args
+ args = ['-C' + config_fd.name] + args
result = runner.invoke(cli, args)
if not catch_exceptions and result.exception:
print(result.output)
@@ -75,7 +75,7 @@
result = invoke(False, [
'replay',
'--broker', 'localhost:%d' % port,
- '--consumer-id', 'test-cli-consumer',
+ '--group-id', 'test-cli-consumer',
'--prefix', kafka_prefix,
'--max-messages', '1',
])
diff --git a/swh/journal/tests/test_replay.py b/swh/journal/tests/test_replay.py
--- a/swh/journal/tests/test_replay.py
+++ b/swh/journal/tests/test_replay.py
@@ -57,8 +57,8 @@
# Fill the storage from Kafka
config = {
'brokers': 'localhost:%d' % kafka_server[1],
- 'consumer_id': 'replayer',
- 'topic_prefix': kafka_prefix,
+ 'group_id': 'replayer',
+ 'prefix': kafka_prefix,
'max_messages': nb_sent,
}
replayer = JournalClient(**config)
diff --git a/swh/journal/tests/test_write_replay.py b/swh/journal/tests/test_write_replay.py
--- a/swh/journal/tests/test_write_replay.py
+++ b/swh/journal/tests/test_write_replay.py
@@ -23,105 +23,50 @@
FakeKafkaPartition = namedtuple('FakeKafkaPartition', 'topic')
-class MockedDirectKafkaWriter(DirectKafkaWriter):
- def __init__(self):
+class MockedKafkaWriter(DirectKafkaWriter):
+ def __init__(self, queue):
self._prefix = 'prefix'
+ self.queue = queue
-
-class MockedJournalClient(JournalClient):
- def __init__(self, object_types=ACCEPTED_OBJECT_TYPES):
- self._object_types = object_types
-
-
-@given(lists(object_dicts(), min_size=1))
-@settings(suppress_health_check=[HealthCheck.too_slow])
-def test_write_replay_same_order(objects):
- committed = False
- queue = []
-
- def send(topic, key, value):
+ def send(self, topic, key, value):
key = kafka_to_key(key_to_kafka(key))
value = kafka_to_value(value_to_kafka(value))
- queue.append({
- FakeKafkaPartition(topic):
- [FakeKafkaMessage(key=key, value=value)]
- })
-
- def poll():
- return queue.pop(0)
+ partition = FakeKafkaPartition(topic)
+ msg = FakeKafkaMessage(key=key, value=value)
+ if self.queue and {partition} == set(self.queue[-1]):
+ # The last message is of the same object type, groupping them
+ self.queue[-1][partition].append(msg)
+ else:
+ self.queue.append({partition: [msg]})
- def commit():
- nonlocal committed
- if queue == []:
- committed = True
- storage1 = Storage()
- storage1.journal_writer = MockedDirectKafkaWriter()
- storage1.journal_writer.send = send
+class MockedKafkaConsumer:
+ def __init__(self, queue):
+ self.queue = queue
+ self.committed = False
- for (obj_type, obj) in objects:
- obj = obj.copy()
- if obj_type == 'origin_visit':
- origin_id = storage1.origin_add_one(obj.pop('origin'))
- if 'visit' in obj:
- del obj['visit']
- storage1.origin_visit_add(origin_id, **obj)
- else:
- method = getattr(storage1, obj_type + '_add')
- try:
- method([obj])
- except HashCollision:
- pass
+ def poll(self):
+ return self.queue.pop(0)
- storage2 = Storage()
- worker_fn = functools.partial(process_replay_objects, storage=storage2)
- replayer = MockedJournalClient()
- replayer.poll = poll
- replayer.commit = commit
- queue_size = len(queue)
- nb_messages = 0
- while nb_messages < queue_size:
- nb_messages += replayer.process(worker_fn)
+ def commit(self):
+ if self.queue == []:
+ self.committed = True
- assert nb_messages == queue_size
- assert committed
- for attr_name in ('_contents', '_directories', '_revisions', '_releases',
- '_snapshots', '_origin_visits', '_origins'):
- assert getattr(storage1, attr_name) == getattr(storage2, attr_name), \
- attr_name
+class MockedJournalClient(JournalClient):
+ def __init__(self, queue, object_types=ACCEPTED_OBJECT_TYPES):
+ self._object_types = object_types
+ self.consumer = MockedKafkaConsumer(queue)
@given(lists(object_dicts(), min_size=1))
@settings(suppress_health_check=[HealthCheck.too_slow])
def test_write_replay_same_order_batches(objects):
- committed = False
queue = []
-
- def send(topic, key, value):
- key = kafka_to_key(key_to_kafka(key))
- value = kafka_to_value(value_to_kafka(value))
- partition = FakeKafkaPartition(topic)
- msg = FakeKafkaMessage(key=key, value=value)
- if queue and {partition} == set(queue[-1]):
- # The last message is of the same object type, groupping them
- queue[-1][partition].append(msg)
- else:
- queue.append({
- FakeKafkaPartition(topic): [msg]
- })
-
- def poll():
- return queue.pop(0)
-
- def commit():
- nonlocal committed
- if queue == []:
- committed = True
+ replayer = MockedJournalClient(queue)
storage1 = Storage()
- storage1.journal_writer = MockedDirectKafkaWriter()
- storage1.journal_writer.send = send
+ storage1.journal_writer = MockedKafkaWriter(queue)
for (obj_type, obj) in objects:
obj = obj.copy()
@@ -143,14 +88,11 @@
storage2 = Storage()
worker_fn = functools.partial(process_replay_objects, storage=storage2)
- replayer = MockedJournalClient()
- replayer.poll = poll
- replayer.commit = commit
nb_messages = 0
while nb_messages < queue_size:
nb_messages += replayer.process(worker_fn)
- assert committed
+ assert replayer.consumer.committed
for attr_name in ('_contents', '_directories', '_revisions', '_releases',
'_snapshots', '_origin_visits', '_origins'):

File Metadata

Mime Type
text/plain
Expires
Thu, Jan 30, 8:50 AM (20 h, 11 m ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3219657

Event Timeline