Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9343445
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
21 KB
Subscribers
None
View Options
diff --git a/setup.py b/setup.py
index db1d2c8..90b0b1f 100755
--- a/setup.py
+++ b/setup.py
@@ -1,65 +1,69 @@
#!/usr/bin/env python3
# Copyright (C) 2015-2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from setuptools import setup, find_packages
from os import path
from io import open
here = path.abspath(path.dirname(__file__))
# Get the long description from the README file
with open(path.join(here, 'README.md'), encoding='utf-8') as f:
long_description = f.read()
def parse_requirements(name=None):
if name:
reqf = 'requirements-%s.txt' % name
else:
reqf = 'requirements.txt'
requirements = []
if not path.exists(reqf):
return requirements
with open(reqf) as f:
for line in f.readlines():
line = line.strip()
if not line or line.startswith('#'):
continue
requirements.append(line)
return requirements
setup(
name='swh.journal',
description='Software Heritage Journal utilities',
long_description=long_description,
long_description_content_type='text/markdown',
author='Software Heritage developers',
author_email='swh-devel@inria.fr',
url='https://forge.softwareheritage.org/diffusion/DJNL/',
packages=find_packages(),
scripts=[],
+ entry_points='''
+ [console_scripts]
+ swh-journal=swh.journal.cli:main
+ ''',
install_requires=parse_requirements() + parse_requirements('swh'),
setup_requires=['vcversioner'],
extras_require={'testing': parse_requirements('test')},
vcversioner={},
include_package_data=True,
classifiers=[
"Programming Language :: Python :: 3",
"Intended Audience :: Developers",
"License :: OSI Approved :: GNU General Public License v3 (GPLv3)",
"Operating System :: OS Independent",
"Development Status :: 5 - Production/Stable",
],
project_urls={
'Bug Reports': 'https://forge.softwareheritage.org/maniphest',
'Funding': 'https://www.softwareheritage.org/donate',
'Source': 'https://forge.softwareheritage.org/source/swh-journal',
},
)
diff --git a/swh/journal/cli.py b/swh/journal/cli.py
new file mode 100644
index 0000000..b498fbb
--- /dev/null
+++ b/swh/journal/cli.py
@@ -0,0 +1,89 @@
+# Copyright (C) 2016-2019 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import click
+import logging
+import os
+
+from swh.core import config
+from swh.journal.publisher import JournalPublisher
+
+CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help'])
+
+
+@click.group(context_settings=CONTEXT_SETTINGS)
+@click.option('--config-file', '-C', default=None,
+ type=click.Path(exists=True, dir_okay=False,),
+ help="Configuration file.")
+@click.option('--log-level', '-l', default='INFO',
+ type=click.Choice(logging._nameToLevel.keys()),
+ help="Log level (default to INFO)")
+@click.pass_context
+def cli(ctx, config_file, log_level):
+ """Software Heritage Scheduler CLI interface
+
+ Default to use the the local scheduler instance (plugged to the
+ main scheduler db).
+
+ """
+ if not config_file:
+ config_file = os.environ.get('SWH_CONFIG_FILENAME')
+ if not config_file:
+ raise ValueError('You must either pass a config-file parameter '
+ 'or set SWH_CONFIG_FILENAME to target '
+ 'the config-file')
+
+ if not os.path.exists(config_file):
+ raise ValueError('%s does not exist' % config_file)
+
+ conf = config.read(config_file)
+ ctx.ensure_object(dict)
+
+ logger = logging.getLogger(__name__)
+ logger.setLevel(log_level)
+
+ _log = logging.getLogger('kafka')
+ _log.setLevel(logging.INFO)
+
+ ctx.obj['config'] = conf
+ ctx.obj['loglevel'] = log_level
+
+
+@cli.command()
+@click.pass_context
+def publisher(ctx):
+ """Manipulate publisher
+
+ """
+ mandatory_keys = [
+ 'brokers', 'temporary_prefix', 'final_prefix', 'consumer_id',
+ 'publisher_id', 'object_types', 'storage'
+ ]
+
+ conf = ctx.obj['config']
+ missing_keys = []
+ for key in mandatory_keys:
+ if not conf.get(key):
+ missing_keys.append(key)
+
+ if missing_keys:
+ raise click.ClickException(
+ 'Configuration error: The following keys must be'
+ ' provided: %s' % (','.join(missing_keys), ))
+
+ publisher = JournalPublisher(conf)
+ try:
+ while True:
+ publisher.poll()
+ except KeyboardInterrupt:
+ ctx.exit(0)
+
+
+def main():
+ return cli(auto_envvar_prefix='SWH_JOURNAL')
+
+
+if __name__ == '__main__':
+ main()
diff --git a/swh/journal/publisher.py b/swh/journal/publisher.py
index 2e86125..ff0e0f4 100644
--- a/swh/journal/publisher.py
+++ b/swh/journal/publisher.py
@@ -1,239 +1,194 @@
-# Copyright (C) 2016-2018 The Software Heritage developers
+# Copyright (C) 2016-2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from collections import defaultdict
import logging
from kafka import KafkaProducer, KafkaConsumer
-from swh.core.config import SWHConfig
from swh.storage import get_storage
from swh.storage.algos import snapshot
from .serializers import kafka_to_key, key_to_kafka
-class JournalPublisher(SWHConfig):
+class JournalPublisher:
"""The journal publisher is a layer in charge of:
- consuming messages from topics (1 topic per object_type)
- reify the object ids read from those topics (using the storage)
- producing those reified objects to output topics (1 topic per
object type)
The main entry point for this class is the 'poll' method.
"""
- DEFAULT_CONFIG = {
- 'brokers': ('list[str]', ['getty.internal.softwareheritage.org']),
-
- 'temporary_prefix': ('str', 'swh.tmp_journal.new'),
- 'final_prefix': ('str', 'swh.journal.objects'),
-
- 'consumer_id': ('str', 'swh.journal.publisher'),
- 'publisher_id': ('str', 'swh.journal.publisher'),
-
- 'object_types': ('list[str]', ['content', 'revision', 'release']),
-
- 'storage': ('dict', {
- 'cls': 'remote',
- 'args': {
- 'url': 'http://localhost:5002/',
- }
- }),
-
- 'max_messages': ('int', 10000),
- }
-
- CONFIG_BASE_FILENAME = 'journal/publisher'
-
- def __init__(self, extra_configuration=None):
- self.config = config = self.parse_config_file()
- if extra_configuration:
- config.update(extra_configuration)
-
+ def __init__(self, config):
+ self.config = config
self._prepare_storage(config)
self._prepare_journal(config)
-
self.max_messages = self.config['max_messages']
def _prepare_journal(self, config):
"""Prepare the consumer and subscriber instances for the publisher to
actually be able to discuss with the journal.
"""
# yes, the temporary topics contain values that are actually _keys_
self.consumer = KafkaConsumer(
bootstrap_servers=config['brokers'],
value_deserializer=kafka_to_key,
auto_offset_reset='earliest',
enable_auto_commit=False,
group_id=config['consumer_id'],
)
self.producer = KafkaProducer(
bootstrap_servers=config['brokers'],
key_serializer=key_to_kafka,
value_serializer=key_to_kafka,
client_id=config['publisher_id'],
)
logging.debug('Subscribing to object types event: %s' % (
config['object_types'], ))
self.consumer.subscribe(
topics=['%s.%s' % (config['temporary_prefix'], object_type)
for object_type in config['object_types']],
)
def _prepare_storage(self, config):
"""Prepare the storage instance needed for the publisher to be able to
discuss with the storage to retrieve the objects.
"""
self.storage = get_storage(**config['storage'])
def poll(self, max_messages=None):
"""Process a batch of messages from the consumer's topics. Use the
storage to reify those ids. Produces back those reified
objects to the production topics.
This method polls a given amount of message then stops.
The number of messages to consume is either provided or
configured as fallback.
The following method is expected to be called from within a
loop.
"""
messages = defaultdict(list)
if max_messages is None:
max_messages = self.max_messages
for num, message in enumerate(self.consumer):
object_type = message.topic.split('.')[-1]
logging.debug('num: %s, object_type: %s, message: %s' % (
num, object_type, message))
messages[object_type].append(message.value)
if num + 1 >= self.max_messages:
break
new_objects = self.process_objects(messages)
self.produce_messages(new_objects)
self.consumer.commit()
def process_objects(self, messages):
"""Given a dict of messages {object type: [object id]}, reify those
ids to swh object from the storage and returns a
corresponding dict.
Args:
messages (dict): Dict of {object_type: [id-as-bytes]}
Returns:
Dict of {object_type: [tuple]}.
object_type (str): content, revision, release
tuple (bytes, dict): object id as bytes, object as swh dict.
"""
processors = {
'content': self.process_contents,
'revision': self.process_revisions,
'release': self.process_releases,
'snapshot': self.process_snapshots,
'origin': self.process_origins,
'origin_visit': self.process_origin_visits,
}
return {
key: processors[key](value)
for key, value in messages.items()
}
def produce_messages(self, messages):
"""Produce new swh object to the producer topic.
Args:
messages ([dict]): Dict of {object_type: [tuple]}.
object_type (str): content, revision, release
tuple (bytes, dict): object id as bytes, object as swh dict.
"""
for object_type, objects in messages.items():
topic = '%s.%s' % (self.config['final_prefix'], object_type)
for key, object in objects:
logging.debug('topic: %s, key: %s, value: %s' % (
topic, key, object))
self.producer.send(topic, key=key, value=object)
self.producer.flush()
def process_contents(self, content_objs):
logging.debug('contents: %s' % content_objs)
metadata = self.storage.content_get_metadata(
(c[b'sha1'] for c in content_objs))
return [(content['sha1'], content) for content in metadata]
def process_revisions(self, revision_objs):
logging.debug('revisions: %s' % revision_objs)
metadata = self.storage.revision_get((r[b'id'] for r in revision_objs))
return [(revision['id'], revision)
for revision in metadata if revision]
def process_releases(self, release_objs):
logging.debug('releases: %s' % release_objs)
metadata = self.storage.release_get((r[b'id'] for r in release_objs))
return [(release['id'], release) for release in metadata]
def process_origins(self, origin_objs):
logging.debug('origins: %s' % origin_objs)
r = []
for o in origin_objs:
origin = {'url': o[b'url'], 'type': o[b'type']}
r.append((origin, origin))
return r
def process_origin_visits(self, origin_visits):
logging.debug('origin_visits: %s' % origin_visits)
metadata = []
for ov in origin_visits:
origin_visit = self.storage.origin_visit_get_by(
ov[b'origin'], ov[b'visit'])
if origin_visit:
pk = ov[b'origin'], ov[b'visit']
origin_visit['date'] = str(origin_visit['date'])
metadata.append((pk, origin_visit))
return metadata
def process_snapshots(self, snapshot_objs):
logging.debug('snapshots: %s' % snapshot_objs)
metadata = []
for snap in snapshot_objs:
full_obj = snapshot.snapshot_get_all_branches(
self.storage, snap[b'id'])
metadata.append((full_obj['id'], full_obj))
return metadata
if __name__ == '__main__':
- import click
-
- @click.command()
- @click.option('--verbose', is_flag=True, default=False,
- help='Be verbose if asked.')
- def main(verbose):
- logging.basicConfig(
- level=logging.DEBUG if verbose else logging.INFO,
- format='%(asctime)s %(process)d %(levelname)s %(message)s'
- )
- _log = logging.getLogger('kafka')
- _log.setLevel(logging.INFO)
-
- publisher = JournalPublisher()
- while True:
- publisher.poll()
-
- main()
+ print('Please use the "swh-journal publisher run" command')
diff --git a/swh/journal/tests/test_publisher.py b/swh/journal/tests/test_publisher.py
index f634f1f..4040553 100644
--- a/swh/journal/tests/test_publisher.py
+++ b/swh/journal/tests/test_publisher.py
@@ -1,217 +1,223 @@
# Copyright (C) 2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import unittest
from swh.model.hashutil import hash_to_bytes
from swh.journal.publisher import JournalPublisher
from swh.storage.in_memory import Storage
CONTENTS = [
{
'length': 3,
'sha1': hash_to_bytes(
'34973274ccef6ab4dfaaf86599792fa9c3fe4689'),
'sha1_git': b'foo',
'blake2s256': b'bar',
'sha256': b'baz',
'status': 'visible',
},
]
COMMITTER = [
{
'id': 1,
'fullname': 'foo',
},
{
'id': 2,
'fullname': 'bar',
}
]
REVISIONS = [
{
'id': hash_to_bytes('7026b7c1a2af56521e951c01ed20f255fa054238'),
'message': b'hello',
'date': {
'timestamp': {
'seconds': 1234567891,
'microseconds': 0,
},
'offset': 120,
'negative_utc': None,
},
'committer': COMMITTER[0],
'author': COMMITTER[0],
'committer_date': None,
},
{
'id': hash_to_bytes('368a48fe15b7db2383775f97c6b247011b3f14f4'),
'message': b'hello again',
'date': {
'timestamp': {
'seconds': 1234567892,
'microseconds': 0,
},
'offset': 120,
'negative_utc': None,
},
'committer': COMMITTER[1],
'author': COMMITTER[1],
'committer_date': None,
},
]
RELEASES = [
{
'id': hash_to_bytes('d81cc0710eb6cf9efd5b920a8453e1e07157b6cd'),
'name': b'v0.0.1',
'date': {
'timestamp': {
'seconds': 1234567890,
'microseconds': 0,
},
'offset': 120,
'negative_utc': None,
},
'author': COMMITTER[0],
},
]
ORIGINS = [
{
'url': 'https://somewhere.org/den/fox',
'type': 'git',
},
{
'url': 'https://overtherainbow.org/fox/den',
'type': 'svn',
}
]
ORIGIN_VISITS = [
{
'date': '2013-05-07T04:20:39.369271+00:00',
},
{
'date': '2018-11-27T17:20:39.000000+00:00',
}
]
+TEST_CONFIG = {
+ 'brokers': ['localhost'],
+ 'temporary_prefix': 'swh.tmp_journal.new',
+ 'final_prefix': 'swh.journal.objects',
+ 'consumer_id': 'swh.journal.test.publisher',
+ 'publisher_id': 'swh.journal.test.publisher',
+ 'object_types': ['content'],
+ 'max_messages': 3,
+}
-class JournalPublisherTest(JournalPublisher):
- def parse_config_file(self):
- return {
- 'brokers': ['localhost'],
- 'temporary_prefix': 'swh.tmp_journal.new',
- 'final_prefix': 'swh.journal.objects',
- 'consumer_id': 'swh.journal.test.publisher',
- 'publisher_id': 'swh.journal.test.publisher',
- 'object_types': ['content'],
- 'max_messages': 3,
- }
+class JournalPublisherTest(JournalPublisher):
def _prepare_storage(self, config):
self.storage = Storage()
self.storage.content_add({'data': b'42', **c} for c in CONTENTS)
self.storage.revision_add(REVISIONS)
self.storage.release_add(RELEASES)
origins = self.storage.origin_add(ORIGINS)
origin_visits = []
for i, ov in enumerate(ORIGIN_VISITS):
origin_id = origins[i]['id']
ov = self.storage.origin_visit_add(origin_id, ov['date'])
origin_visits.append(ov)
self.origins = origins
self.origin_visits = origin_visits
print("publisher.origin-visits", self.origin_visits)
+
+class JournalPublisherNoKafkaInMemoryStorage(JournalPublisherTest):
+ """A journal publisher with:
+ - no kafka dependency
+ - in-memory storage
+ """
+
def _prepare_journal(self, config):
"""No journal for now
"""
pass
class TestPublisher(unittest.TestCase):
def setUp(self):
- self.publisher = JournalPublisherTest()
+ self.publisher = JournalPublisherNoKafkaInMemoryStorage(TEST_CONFIG)
self.contents = [{b'sha1': c['sha1']} for c in CONTENTS]
self.revisions = [{b'id': c['id']} for c in REVISIONS]
self.releases = [{b'id': c['id']} for c in RELEASES]
# those needs id generation from the storage
# so initialization is different than other entities
self.origins = [{b'url': o['url'],
b'type': o['type']}
for o in self.publisher.origins]
self.origin_visits = [{b'origin': ov['origin'],
b'visit': ov['visit']}
for ov in self.publisher.origin_visits]
# full objects
storage = self.publisher.storage
ovs = []
for ov in self.origin_visits:
_ov = storage.origin_visit_get_by(
ov[b'origin'], ov[b'visit'])
_ov['date'] = str(_ov['date'])
ovs.append(_ov)
self.expected_origin_visits = ovs
def test_process_contents(self):
actual_contents = self.publisher.process_contents(self.contents)
expected_contents = [(c['sha1'], c) for c in CONTENTS]
self.assertEqual(actual_contents, expected_contents)
def test_process_revisions(self):
actual_revisions = self.publisher.process_revisions(self.revisions)
expected_revisions = [(c['id'], c) for c in REVISIONS]
self.assertEqual(actual_revisions, expected_revisions)
def test_process_releases(self):
actual_releases = self.publisher.process_releases(self.releases)
expected_releases = [(c['id'], c) for c in RELEASES]
self.assertEqual(actual_releases, expected_releases)
def test_process_origins(self):
actual_origins = self.publisher.process_origins(self.origins)
expected_origins = [({'url': o[b'url'], 'type': o[b'type']},
{'url': o[b'url'], 'type': o[b'type']})
for o in self.origins]
self.assertEqual(actual_origins, expected_origins)
def test_process_origin_visits(self):
actual_ovs = self.publisher.process_origin_visits(self.origin_visits)
expected_ovs = [((ov['origin'], ov['visit']), ov)
for ov in self.expected_origin_visits]
self.assertEqual(actual_ovs, expected_ovs)
def test_process_objects(self):
messages = {
'content': self.contents,
'revision': self.revisions,
'release': self.releases,
'origin': self.origins,
'origin_visit': self.origin_visits,
}
actual_objects = self.publisher.process_objects(messages)
expected_contents = [(c['sha1'], c) for c in CONTENTS]
expected_revisions = [(c['id'], c) for c in REVISIONS]
expected_releases = [(c['id'], c) for c in RELEASES]
expected_origins = [(o, o) for o in ORIGINS]
expected_ovs = [((ov['origin'], ov['visit']), ov)
for ov in self.expected_origin_visits]
expected_objects = {
'content': expected_contents,
'revision': expected_revisions,
'release': expected_releases,
'origin': expected_origins,
'origin_visit': expected_ovs,
}
self.assertEqual(actual_objects, expected_objects)
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Fri, Jul 4, 1:32 PM (6 d, 12 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3270880
Attached To
rDJNL Journal infrastructure
Event Timeline
Log In to Comment