Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F7163478
D1540.id5136.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
16 KB
Subscribers
None
D1540.id5136.diff
View Options
diff --git a/setup.py b/setup.py
--- a/setup.py
+++ b/setup.py
@@ -48,6 +48,8 @@
entry_points='''
[console_scripts]
swh-journal=swh.journal.cli:main
+ [swh.cli.subcommands]
+ journal=swh.journal.cli:cli
''',
install_requires=parse_requirements() + parse_requirements('swh'),
setup_requires=['vcversioner'],
diff --git a/swh/journal/__init__.py b/swh/journal/__init__.py
--- a/swh/journal/__init__.py
+++ b/swh/journal/__init__.py
@@ -0,0 +1,7 @@
+# Copyright (C) 2019 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+# the default prefix for kafka's topics
+DEFAULT_PREFIX = 'swh.journal.objects'
diff --git a/swh/journal/backfill.py b/swh/journal/backfill.py
--- a/swh/journal/backfill.py
+++ b/swh/journal/backfill.py
@@ -366,7 +366,7 @@
yield record
-MANDATORY_KEYS = ['brokers', 'storage_dbconn', 'final_prefix', 'client_id']
+MANDATORY_KEYS = ['brokers', 'storage_dbconn', 'prefix', 'client_id']
class JournalBackfiller:
@@ -430,7 +430,7 @@
db = BaseDb.connect(self.config['storage_dbconn'])
writer = DirectKafkaWriter(
brokers=self.config['brokers'],
- prefix=self.config['final_prefix'],
+ prefix=self.config['prefix'],
client_id=self.config['client_id']
)
for range_start, range_end in RANGE_GENERATORS[object_type](
diff --git a/swh/journal/cli.py b/swh/journal/cli.py
--- a/swh/journal/cli.py
+++ b/swh/journal/cli.py
@@ -9,52 +9,43 @@
import os
from swh.core import config
+from swh.core.cli import CONTEXT_SETTINGS
from swh.storage import get_storage
from swh.journal.client import JournalClient
from swh.journal.replay import process_replay_objects
from swh.journal.backfill import JournalBackfiller
-CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help'])
-
-@click.group(context_settings=CONTEXT_SETTINGS)
+@click.group(name='journal', context_settings=CONTEXT_SETTINGS)
@click.option('--config-file', '-C', default=None,
type=click.Path(exists=True, dir_okay=False,),
help="Configuration file.")
-@click.option('--log-level', '-l', default='INFO',
- type=click.Choice(logging._nameToLevel.keys()),
- help="Log level (default to INFO)")
@click.pass_context
-def cli(ctx, config_file, log_level):
- """Software Heritage Scheduler CLI interface
+def cli(ctx, config_file):
+ """Software Heritage Journal tools.
- Default to use the the local scheduler instance (plugged to the
- main scheduler db).
+ The journal is a persistent logger of changes to the archive, with
+ publish-subscribe support.
"""
if not config_file:
config_file = os.environ.get('SWH_CONFIG_FILENAME')
- if not config_file:
- raise ValueError('You must either pass a config-file parameter '
- 'or set SWH_CONFIG_FILENAME to target '
- 'the config-file')
- if not os.path.exists(config_file):
- raise ValueError('%s does not exist' % config_file)
+ if config_file:
+ if not os.path.exists(config_file):
+ raise ValueError('%s does not exist' % config_file)
+ conf = config.read(config_file)
+ else:
+ conf = {}
- conf = config.read(config_file)
ctx.ensure_object(dict)
- logging.basicConfig(
- level=log_level,
- format='%(asctime)s %(levelname)s %(name)s %(message)s',
- )
-
+ log_level = ctx.obj.get('log_level', logging.INFO)
+ logging.root.setLevel(log_level)
logging.getLogger('kafka').setLevel(logging.INFO)
ctx.obj['config'] = conf
- ctx.obj['loglevel'] = log_level
@cli.command()
@@ -62,22 +53,44 @@
help='Maximum number of objects to replay. Default is to '
'run forever.')
@click.option('--broker', 'brokers', type=str, multiple=True,
+ hidden=True, # prefer config file
help='Kafka broker to connect to.')
-@click.option('--prefix', type=str, default='swh.journal.objects',
+@click.option('--prefix', type=str, default=None,
+ hidden=True, # prefer config file
help='Prefix of Kafka topic names to read from.')
-@click.option('--consumer-id', type=str,
+@click.option('--group-id', '--consumer-id', type=str,
+ hidden=True, # prefer config file
help='Name of the consumer/group id for reading from Kafka.')
@click.pass_context
-def replay(ctx, brokers, prefix, consumer_id, max_messages):
- """Fill a new storage by reading a journal.
+def replay(ctx, brokers, prefix, group_id, max_messages):
+ """Fill a Storage by reading a Journal.
+ There can be several 'replayers' filling a Storage as long as they use
+ the same `group-id`.
"""
- conf = ctx.obj['config']
logger = logging.getLogger(__name__)
- logger.setLevel(ctx.obj['loglevel'])
- storage = get_storage(**conf.pop('storage'))
- client = JournalClient(brokers, prefix, consumer_id)
+ conf = ctx.obj['config']
+ try:
+ storage = get_storage(**conf.pop('storage'))
+ except KeyError:
+ ctx.fail('You must have a storage configured in your config file.')
+
+ if brokers is None:
+ brokers = conf.get('journal', {}).get('brokers')
+ if not brokers:
+ ctx.fail('You must specify at least one kafka broker.')
+ if not isinstance(brokers, (list, tuple)):
+ brokers = [brokers]
+
+ if prefix is None:
+ prefix = conf.get('journal', {}).get('prefix')
+
+ if group_id is None:
+ group_id = conf.get('journal', {}).get('group_id')
+
+ client = JournalClient(brokers=brokers, group_id=group_id, prefix=prefix)
worker_fn = functools.partial(process_replay_objects, storage=storage)
+
try:
nb_messages = 0
while not max_messages or nb_messages < max_messages:
@@ -96,7 +109,20 @@
@click.option('--dry-run', is_flag=True, default=False)
@click.pass_context
def backfiller(ctx, object_type, start_object, end_object, dry_run):
- """Manipulate backfiller
+ """Run the backfiller
+
+ The backfiller list objects from a Storage and produce journal entries from
+ there.
+
+ Typically used to rebuild a journal or compensate for missing objects in a
+ journal (eg. due to a downtime of this later).
+
+ The configuration file requires the following entries:
+ - brokers: a list of kafka endpoints (the journal) in which entries will be
+ added.
+ - storage_dbconn: URL to connect to the storage DB.
+ - prefix: the prefix of the topics (topics will be <prefix>.<object_type>).
+ - client_id: the kafka client ID.
"""
conf = ctx.obj['config']
@@ -111,6 +137,7 @@
def main():
+ logging.basicConfig()
return cli(auto_envvar_prefix='SWH_JOURNAL')
diff --git a/swh/journal/client.py b/swh/journal/client.py
--- a/swh/journal/client.py
+++ b/swh/journal/client.py
@@ -7,7 +7,7 @@
import logging
from .serializers import kafka_to_key, kafka_to_value
-
+from swh.journal import DEFAULT_PREFIX
logger = logging.getLogger(__name__)
@@ -46,7 +46,7 @@
"""
def __init__(
- self, brokers, topic_prefix, consumer_id,
+ self, brokers, group_id, prefix=DEFAULT_PREFIX,
object_types=ACCEPTED_OBJECT_TYPES,
max_messages=0, auto_offset_reset='earliest'):
@@ -67,23 +67,17 @@
value_deserializer=kafka_to_value,
auto_offset_reset=auto_offset_reset,
enable_auto_commit=False,
- group_id=consumer_id,
+ group_id=group_id,
)
self.consumer.subscribe(
- topics=['%s.%s' % (topic_prefix, object_type)
+ topics=['%s.%s' % (prefix, object_type)
for object_type in object_types],
)
self.max_messages = max_messages
self._object_types = object_types
- def poll(self):
- return self.consumer.poll()
-
- def commit(self):
- self.consumer.commit()
-
def process(self, worker_fn):
"""Polls Kafka for a batch of messages, and calls the worker_fn
with these messages.
@@ -94,7 +88,7 @@
argument.
"""
nb_messages = 0
- polled = self.poll()
+ polled = self.consumer.poll()
for (partition, messages) in polled.items():
object_type = partition.topic.split('.')[-1]
# Got a message from a topic we did not subscribe to.
@@ -104,5 +98,5 @@
nb_messages += len(messages)
- self.commit()
+ self.consumer.commit()
return nb_messages
diff --git a/swh/journal/tests/conftest.py b/swh/journal/tests/conftest.py
--- a/swh/journal/tests/conftest.py
+++ b/swh/journal/tests/conftest.py
@@ -163,8 +163,6 @@
TEST_CONFIG = {
- 'temporary_prefix': 'swh.tmp_journal.new',
- 'final_prefix': 'swh.journal.objects',
'consumer_id': 'swh.journal.consumer',
'object_types': OBJECT_TYPE_KEYS.keys(),
'max_messages': 1, # will read 1 message and stops
@@ -182,8 +180,7 @@
return {
**TEST_CONFIG,
'brokers': ['localhost:{}'.format(port)],
- 'temporary_prefix': kafka_prefix + '.swh.tmp_journal.new',
- 'final_prefix': kafka_prefix + '.swh.journal.objects',
+ 'prefix': kafka_prefix + '.swh.journal.objects',
}
@@ -194,7 +191,7 @@
"""
kafka_topics = [
- '%s.%s' % (test_config['final_prefix'], object_type)
+ '%s.%s' % (test_config['prefix'], object_type)
for object_type in test_config['object_types']]
_, kafka_port = kafka_server
consumer = KafkaConsumer(
diff --git a/swh/journal/tests/test_backfill.py b/swh/journal/tests/test_backfill.py
--- a/swh/journal/tests/test_backfill.py
+++ b/swh/journal/tests/test_backfill.py
@@ -12,7 +12,7 @@
TEST_CONFIG = {
'brokers': ['localhost'],
- 'final_prefix': 'swh.tmp_journal.new',
+ 'prefix': 'swh.tmp_journal.new',
'client_id': 'swh.journal.client.test',
'storage_dbconn': 'service=swh-dev',
}
diff --git a/swh/journal/tests/test_cli.py b/swh/journal/tests/test_cli.py
--- a/swh/journal/tests/test_cli.py
+++ b/swh/journal/tests/test_cli.py
@@ -41,7 +41,7 @@
with tempfile.NamedTemporaryFile('a', suffix='.yml') as config_fd:
config_fd.write(CLI_CONFIG)
config_fd.seek(0)
- args = ['-C' + config_fd.name, '-l', 'DEBUG'] + args
+ args = ['-C' + config_fd.name] + args
result = runner.invoke(cli, args)
if not catch_exceptions and result.exception:
print(result.output)
@@ -75,7 +75,7 @@
result = invoke(False, [
'replay',
'--broker', 'localhost:%d' % port,
- '--consumer-id', 'test-cli-consumer',
+ '--group-id', 'test-cli-consumer',
'--prefix', kafka_prefix,
'--max-messages', '1',
])
diff --git a/swh/journal/tests/test_replay.py b/swh/journal/tests/test_replay.py
--- a/swh/journal/tests/test_replay.py
+++ b/swh/journal/tests/test_replay.py
@@ -57,8 +57,8 @@
# Fill the storage from Kafka
config = {
'brokers': 'localhost:%d' % kafka_server[1],
- 'consumer_id': 'replayer',
- 'topic_prefix': kafka_prefix,
+ 'group_id': 'replayer',
+ 'prefix': kafka_prefix,
'max_messages': nb_sent,
}
replayer = JournalClient(**config)
diff --git a/swh/journal/tests/test_write_replay.py b/swh/journal/tests/test_write_replay.py
--- a/swh/journal/tests/test_write_replay.py
+++ b/swh/journal/tests/test_write_replay.py
@@ -23,105 +23,50 @@
FakeKafkaPartition = namedtuple('FakeKafkaPartition', 'topic')
-class MockedDirectKafkaWriter(DirectKafkaWriter):
- def __init__(self):
+class MockedKafkaWriter(DirectKafkaWriter):
+ def __init__(self, queue):
self._prefix = 'prefix'
+ self.queue = queue
-
-class MockedJournalClient(JournalClient):
- def __init__(self, object_types=ACCEPTED_OBJECT_TYPES):
- self._object_types = object_types
-
-
-@given(lists(object_dicts(), min_size=1))
-@settings(suppress_health_check=[HealthCheck.too_slow])
-def test_write_replay_same_order(objects):
- committed = False
- queue = []
-
- def send(topic, key, value):
+ def send(self, topic, key, value):
key = kafka_to_key(key_to_kafka(key))
value = kafka_to_value(value_to_kafka(value))
- queue.append({
- FakeKafkaPartition(topic):
- [FakeKafkaMessage(key=key, value=value)]
- })
-
- def poll():
- return queue.pop(0)
+ partition = FakeKafkaPartition(topic)
+ msg = FakeKafkaMessage(key=key, value=value)
+ if self.queue and {partition} == set(self.queue[-1]):
+ # The last message is of the same object type, groupping them
+ self.queue[-1][partition].append(msg)
+ else:
+ self.queue.append({partition: [msg]})
- def commit():
- nonlocal committed
- if queue == []:
- committed = True
- storage1 = Storage()
- storage1.journal_writer = MockedDirectKafkaWriter()
- storage1.journal_writer.send = send
+class MockedKafkaConsumer:
+ def __init__(self, queue):
+ self.queue = queue
+ self.committed = False
- for (obj_type, obj) in objects:
- obj = obj.copy()
- if obj_type == 'origin_visit':
- origin_id = storage1.origin_add_one(obj.pop('origin'))
- if 'visit' in obj:
- del obj['visit']
- storage1.origin_visit_add(origin_id, **obj)
- else:
- method = getattr(storage1, obj_type + '_add')
- try:
- method([obj])
- except HashCollision:
- pass
+ def poll(self):
+ return self.queue.pop(0)
- storage2 = Storage()
- worker_fn = functools.partial(process_replay_objects, storage=storage2)
- replayer = MockedJournalClient()
- replayer.poll = poll
- replayer.commit = commit
- queue_size = len(queue)
- nb_messages = 0
- while nb_messages < queue_size:
- nb_messages += replayer.process(worker_fn)
+ def commit(self):
+ if self.queue == []:
+ self.committed = True
- assert nb_messages == queue_size
- assert committed
- for attr_name in ('_contents', '_directories', '_revisions', '_releases',
- '_snapshots', '_origin_visits', '_origins'):
- assert getattr(storage1, attr_name) == getattr(storage2, attr_name), \
- attr_name
+class MockedJournalClient(JournalClient):
+ def __init__(self, queue, object_types=ACCEPTED_OBJECT_TYPES):
+ self._object_types = object_types
+ self.consumer = MockedKafkaConsumer(queue)
@given(lists(object_dicts(), min_size=1))
@settings(suppress_health_check=[HealthCheck.too_slow])
def test_write_replay_same_order_batches(objects):
- committed = False
queue = []
-
- def send(topic, key, value):
- key = kafka_to_key(key_to_kafka(key))
- value = kafka_to_value(value_to_kafka(value))
- partition = FakeKafkaPartition(topic)
- msg = FakeKafkaMessage(key=key, value=value)
- if queue and {partition} == set(queue[-1]):
- # The last message is of the same object type, groupping them
- queue[-1][partition].append(msg)
- else:
- queue.append({
- FakeKafkaPartition(topic): [msg]
- })
-
- def poll():
- return queue.pop(0)
-
- def commit():
- nonlocal committed
- if queue == []:
- committed = True
+ replayer = MockedJournalClient(queue)
storage1 = Storage()
- storage1.journal_writer = MockedDirectKafkaWriter()
- storage1.journal_writer.send = send
+ storage1.journal_writer = MockedKafkaWriter(queue)
for (obj_type, obj) in objects:
obj = obj.copy()
@@ -143,14 +88,11 @@
storage2 = Storage()
worker_fn = functools.partial(process_replay_objects, storage=storage2)
- replayer = MockedJournalClient()
- replayer.poll = poll
- replayer.commit = commit
nb_messages = 0
while nb_messages < queue_size:
nb_messages += replayer.process(worker_fn)
- assert committed
+ assert replayer.consumer.committed
for attr_name in ('_contents', '_directories', '_revisions', '_releases',
'_snapshots', '_origin_visits', '_origins'):
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Thu, Jan 30, 8:50 AM (20 h, 11 m ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3219657
Attached To
D1540: cli: register the 'journal' cli subcommand and improve it
Event Timeline
Log In to Comment