Page MenuHomeSoftware Heritage

No OneTemporary

diff --git a/setup.py b/setup.py
index db1d2c8..90b0b1f 100755
--- a/setup.py
+++ b/setup.py
@@ -1,65 +1,69 @@
#!/usr/bin/env python3
# Copyright (C) 2015-2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from setuptools import setup, find_packages
from os import path
from io import open
here = path.abspath(path.dirname(__file__))
# Get the long description from the README file
with open(path.join(here, 'README.md'), encoding='utf-8') as f:
long_description = f.read()
def parse_requirements(name=None):
if name:
reqf = 'requirements-%s.txt' % name
else:
reqf = 'requirements.txt'
requirements = []
if not path.exists(reqf):
return requirements
with open(reqf) as f:
for line in f.readlines():
line = line.strip()
if not line or line.startswith('#'):
continue
requirements.append(line)
return requirements
setup(
name='swh.journal',
description='Software Heritage Journal utilities',
long_description=long_description,
long_description_content_type='text/markdown',
author='Software Heritage developers',
author_email='swh-devel@inria.fr',
url='https://forge.softwareheritage.org/diffusion/DJNL/',
packages=find_packages(),
scripts=[],
+ entry_points='''
+ [console_scripts]
+ swh-journal=swh.journal.cli:main
+ ''',
install_requires=parse_requirements() + parse_requirements('swh'),
setup_requires=['vcversioner'],
extras_require={'testing': parse_requirements('test')},
vcversioner={},
include_package_data=True,
classifiers=[
"Programming Language :: Python :: 3",
"Intended Audience :: Developers",
"License :: OSI Approved :: GNU General Public License v3 (GPLv3)",
"Operating System :: OS Independent",
"Development Status :: 5 - Production/Stable",
],
project_urls={
'Bug Reports': 'https://forge.softwareheritage.org/maniphest',
'Funding': 'https://www.softwareheritage.org/donate',
'Source': 'https://forge.softwareheritage.org/source/swh-journal',
},
)
diff --git a/swh/journal/cli.py b/swh/journal/cli.py
new file mode 100644
index 0000000..b498fbb
--- /dev/null
+++ b/swh/journal/cli.py
@@ -0,0 +1,89 @@
+# Copyright (C) 2016-2019 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import click
+import logging
+import os
+
+from swh.core import config
+from swh.journal.publisher import JournalPublisher
+
+CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help'])
+
+
+@click.group(context_settings=CONTEXT_SETTINGS)
+@click.option('--config-file', '-C', default=None,
+ type=click.Path(exists=True, dir_okay=False,),
+ help="Configuration file.")
+@click.option('--log-level', '-l', default='INFO',
+ type=click.Choice(logging._nameToLevel.keys()),
+ help="Log level (default to INFO)")
+@click.pass_context
+def cli(ctx, config_file, log_level):
+ """Software Heritage Scheduler CLI interface
+
+ Default to use the the local scheduler instance (plugged to the
+ main scheduler db).
+
+ """
+ if not config_file:
+ config_file = os.environ.get('SWH_CONFIG_FILENAME')
+ if not config_file:
+ raise ValueError('You must either pass a config-file parameter '
+ 'or set SWH_CONFIG_FILENAME to target '
+ 'the config-file')
+
+ if not os.path.exists(config_file):
+ raise ValueError('%s does not exist' % config_file)
+
+ conf = config.read(config_file)
+ ctx.ensure_object(dict)
+
+ logger = logging.getLogger(__name__)
+ logger.setLevel(log_level)
+
+ _log = logging.getLogger('kafka')
+ _log.setLevel(logging.INFO)
+
+ ctx.obj['config'] = conf
+ ctx.obj['loglevel'] = log_level
+
+
+@cli.command()
+@click.pass_context
+def publisher(ctx):
+ """Manipulate publisher
+
+ """
+ mandatory_keys = [
+ 'brokers', 'temporary_prefix', 'final_prefix', 'consumer_id',
+ 'publisher_id', 'object_types', 'storage'
+ ]
+
+ conf = ctx.obj['config']
+ missing_keys = []
+ for key in mandatory_keys:
+ if not conf.get(key):
+ missing_keys.append(key)
+
+ if missing_keys:
+ raise click.ClickException(
+ 'Configuration error: The following keys must be'
+ ' provided: %s' % (','.join(missing_keys), ))
+
+ publisher = JournalPublisher(conf)
+ try:
+ while True:
+ publisher.poll()
+ except KeyboardInterrupt:
+ ctx.exit(0)
+
+
+def main():
+ return cli(auto_envvar_prefix='SWH_JOURNAL')
+
+
+if __name__ == '__main__':
+ main()
diff --git a/swh/journal/publisher.py b/swh/journal/publisher.py
index 2e86125..ff0e0f4 100644
--- a/swh/journal/publisher.py
+++ b/swh/journal/publisher.py
@@ -1,239 +1,194 @@
-# Copyright (C) 2016-2018 The Software Heritage developers
+# Copyright (C) 2016-2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from collections import defaultdict
import logging
from kafka import KafkaProducer, KafkaConsumer
-from swh.core.config import SWHConfig
from swh.storage import get_storage
from swh.storage.algos import snapshot
from .serializers import kafka_to_key, key_to_kafka
-class JournalPublisher(SWHConfig):
+class JournalPublisher:
"""The journal publisher is a layer in charge of:
- consuming messages from topics (1 topic per object_type)
- reify the object ids read from those topics (using the storage)
- producing those reified objects to output topics (1 topic per
object type)
The main entry point for this class is the 'poll' method.
"""
- DEFAULT_CONFIG = {
- 'brokers': ('list[str]', ['getty.internal.softwareheritage.org']),
-
- 'temporary_prefix': ('str', 'swh.tmp_journal.new'),
- 'final_prefix': ('str', 'swh.journal.objects'),
-
- 'consumer_id': ('str', 'swh.journal.publisher'),
- 'publisher_id': ('str', 'swh.journal.publisher'),
-
- 'object_types': ('list[str]', ['content', 'revision', 'release']),
-
- 'storage': ('dict', {
- 'cls': 'remote',
- 'args': {
- 'url': 'http://localhost:5002/',
- }
- }),
-
- 'max_messages': ('int', 10000),
- }
-
- CONFIG_BASE_FILENAME = 'journal/publisher'
-
- def __init__(self, extra_configuration=None):
- self.config = config = self.parse_config_file()
- if extra_configuration:
- config.update(extra_configuration)
-
+ def __init__(self, config):
+ self.config = config
self._prepare_storage(config)
self._prepare_journal(config)
-
self.max_messages = self.config['max_messages']
def _prepare_journal(self, config):
"""Prepare the consumer and subscriber instances for the publisher to
actually be able to discuss with the journal.
"""
# yes, the temporary topics contain values that are actually _keys_
self.consumer = KafkaConsumer(
bootstrap_servers=config['brokers'],
value_deserializer=kafka_to_key,
auto_offset_reset='earliest',
enable_auto_commit=False,
group_id=config['consumer_id'],
)
self.producer = KafkaProducer(
bootstrap_servers=config['brokers'],
key_serializer=key_to_kafka,
value_serializer=key_to_kafka,
client_id=config['publisher_id'],
)
logging.debug('Subscribing to object types event: %s' % (
config['object_types'], ))
self.consumer.subscribe(
topics=['%s.%s' % (config['temporary_prefix'], object_type)
for object_type in config['object_types']],
)
def _prepare_storage(self, config):
"""Prepare the storage instance needed for the publisher to be able to
discuss with the storage to retrieve the objects.
"""
self.storage = get_storage(**config['storage'])
def poll(self, max_messages=None):
"""Process a batch of messages from the consumer's topics. Use the
storage to reify those ids. Produces back those reified
objects to the production topics.
This method polls a given amount of message then stops.
The number of messages to consume is either provided or
configured as fallback.
The following method is expected to be called from within a
loop.
"""
messages = defaultdict(list)
if max_messages is None:
max_messages = self.max_messages
for num, message in enumerate(self.consumer):
object_type = message.topic.split('.')[-1]
logging.debug('num: %s, object_type: %s, message: %s' % (
num, object_type, message))
messages[object_type].append(message.value)
if num + 1 >= self.max_messages:
break
new_objects = self.process_objects(messages)
self.produce_messages(new_objects)
self.consumer.commit()
def process_objects(self, messages):
"""Given a dict of messages {object type: [object id]}, reify those
ids to swh object from the storage and returns a
corresponding dict.
Args:
messages (dict): Dict of {object_type: [id-as-bytes]}
Returns:
Dict of {object_type: [tuple]}.
object_type (str): content, revision, release
tuple (bytes, dict): object id as bytes, object as swh dict.
"""
processors = {
'content': self.process_contents,
'revision': self.process_revisions,
'release': self.process_releases,
'snapshot': self.process_snapshots,
'origin': self.process_origins,
'origin_visit': self.process_origin_visits,
}
return {
key: processors[key](value)
for key, value in messages.items()
}
def produce_messages(self, messages):
"""Produce new swh object to the producer topic.
Args:
messages ([dict]): Dict of {object_type: [tuple]}.
object_type (str): content, revision, release
tuple (bytes, dict): object id as bytes, object as swh dict.
"""
for object_type, objects in messages.items():
topic = '%s.%s' % (self.config['final_prefix'], object_type)
for key, object in objects:
logging.debug('topic: %s, key: %s, value: %s' % (
topic, key, object))
self.producer.send(topic, key=key, value=object)
self.producer.flush()
def process_contents(self, content_objs):
logging.debug('contents: %s' % content_objs)
metadata = self.storage.content_get_metadata(
(c[b'sha1'] for c in content_objs))
return [(content['sha1'], content) for content in metadata]
def process_revisions(self, revision_objs):
logging.debug('revisions: %s' % revision_objs)
metadata = self.storage.revision_get((r[b'id'] for r in revision_objs))
return [(revision['id'], revision)
for revision in metadata if revision]
def process_releases(self, release_objs):
logging.debug('releases: %s' % release_objs)
metadata = self.storage.release_get((r[b'id'] for r in release_objs))
return [(release['id'], release) for release in metadata]
def process_origins(self, origin_objs):
logging.debug('origins: %s' % origin_objs)
r = []
for o in origin_objs:
origin = {'url': o[b'url'], 'type': o[b'type']}
r.append((origin, origin))
return r
def process_origin_visits(self, origin_visits):
logging.debug('origin_visits: %s' % origin_visits)
metadata = []
for ov in origin_visits:
origin_visit = self.storage.origin_visit_get_by(
ov[b'origin'], ov[b'visit'])
if origin_visit:
pk = ov[b'origin'], ov[b'visit']
origin_visit['date'] = str(origin_visit['date'])
metadata.append((pk, origin_visit))
return metadata
def process_snapshots(self, snapshot_objs):
logging.debug('snapshots: %s' % snapshot_objs)
metadata = []
for snap in snapshot_objs:
full_obj = snapshot.snapshot_get_all_branches(
self.storage, snap[b'id'])
metadata.append((full_obj['id'], full_obj))
return metadata
if __name__ == '__main__':
- import click
-
- @click.command()
- @click.option('--verbose', is_flag=True, default=False,
- help='Be verbose if asked.')
- def main(verbose):
- logging.basicConfig(
- level=logging.DEBUG if verbose else logging.INFO,
- format='%(asctime)s %(process)d %(levelname)s %(message)s'
- )
- _log = logging.getLogger('kafka')
- _log.setLevel(logging.INFO)
-
- publisher = JournalPublisher()
- while True:
- publisher.poll()
-
- main()
+ print('Please use the "swh-journal publisher run" command')
diff --git a/swh/journal/tests/test_publisher.py b/swh/journal/tests/test_publisher.py
index f634f1f..4040553 100644
--- a/swh/journal/tests/test_publisher.py
+++ b/swh/journal/tests/test_publisher.py
@@ -1,217 +1,223 @@
# Copyright (C) 2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import unittest
from swh.model.hashutil import hash_to_bytes
from swh.journal.publisher import JournalPublisher
from swh.storage.in_memory import Storage
CONTENTS = [
{
'length': 3,
'sha1': hash_to_bytes(
'34973274ccef6ab4dfaaf86599792fa9c3fe4689'),
'sha1_git': b'foo',
'blake2s256': b'bar',
'sha256': b'baz',
'status': 'visible',
},
]
COMMITTER = [
{
'id': 1,
'fullname': 'foo',
},
{
'id': 2,
'fullname': 'bar',
}
]
REVISIONS = [
{
'id': hash_to_bytes('7026b7c1a2af56521e951c01ed20f255fa054238'),
'message': b'hello',
'date': {
'timestamp': {
'seconds': 1234567891,
'microseconds': 0,
},
'offset': 120,
'negative_utc': None,
},
'committer': COMMITTER[0],
'author': COMMITTER[0],
'committer_date': None,
},
{
'id': hash_to_bytes('368a48fe15b7db2383775f97c6b247011b3f14f4'),
'message': b'hello again',
'date': {
'timestamp': {
'seconds': 1234567892,
'microseconds': 0,
},
'offset': 120,
'negative_utc': None,
},
'committer': COMMITTER[1],
'author': COMMITTER[1],
'committer_date': None,
},
]
RELEASES = [
{
'id': hash_to_bytes('d81cc0710eb6cf9efd5b920a8453e1e07157b6cd'),
'name': b'v0.0.1',
'date': {
'timestamp': {
'seconds': 1234567890,
'microseconds': 0,
},
'offset': 120,
'negative_utc': None,
},
'author': COMMITTER[0],
},
]
ORIGINS = [
{
'url': 'https://somewhere.org/den/fox',
'type': 'git',
},
{
'url': 'https://overtherainbow.org/fox/den',
'type': 'svn',
}
]
ORIGIN_VISITS = [
{
'date': '2013-05-07T04:20:39.369271+00:00',
},
{
'date': '2018-11-27T17:20:39.000000+00:00',
}
]
+TEST_CONFIG = {
+ 'brokers': ['localhost'],
+ 'temporary_prefix': 'swh.tmp_journal.new',
+ 'final_prefix': 'swh.journal.objects',
+ 'consumer_id': 'swh.journal.test.publisher',
+ 'publisher_id': 'swh.journal.test.publisher',
+ 'object_types': ['content'],
+ 'max_messages': 3,
+}
-class JournalPublisherTest(JournalPublisher):
- def parse_config_file(self):
- return {
- 'brokers': ['localhost'],
- 'temporary_prefix': 'swh.tmp_journal.new',
- 'final_prefix': 'swh.journal.objects',
- 'consumer_id': 'swh.journal.test.publisher',
- 'publisher_id': 'swh.journal.test.publisher',
- 'object_types': ['content'],
- 'max_messages': 3,
- }
+class JournalPublisherTest(JournalPublisher):
def _prepare_storage(self, config):
self.storage = Storage()
self.storage.content_add({'data': b'42', **c} for c in CONTENTS)
self.storage.revision_add(REVISIONS)
self.storage.release_add(RELEASES)
origins = self.storage.origin_add(ORIGINS)
origin_visits = []
for i, ov in enumerate(ORIGIN_VISITS):
origin_id = origins[i]['id']
ov = self.storage.origin_visit_add(origin_id, ov['date'])
origin_visits.append(ov)
self.origins = origins
self.origin_visits = origin_visits
print("publisher.origin-visits", self.origin_visits)
+
+class JournalPublisherNoKafkaInMemoryStorage(JournalPublisherTest):
+ """A journal publisher with:
+ - no kafka dependency
+ - in-memory storage
+ """
+
def _prepare_journal(self, config):
"""No journal for now
"""
pass
class TestPublisher(unittest.TestCase):
def setUp(self):
- self.publisher = JournalPublisherTest()
+ self.publisher = JournalPublisherNoKafkaInMemoryStorage(TEST_CONFIG)
self.contents = [{b'sha1': c['sha1']} for c in CONTENTS]
self.revisions = [{b'id': c['id']} for c in REVISIONS]
self.releases = [{b'id': c['id']} for c in RELEASES]
# those needs id generation from the storage
# so initialization is different than other entities
self.origins = [{b'url': o['url'],
b'type': o['type']}
for o in self.publisher.origins]
self.origin_visits = [{b'origin': ov['origin'],
b'visit': ov['visit']}
for ov in self.publisher.origin_visits]
# full objects
storage = self.publisher.storage
ovs = []
for ov in self.origin_visits:
_ov = storage.origin_visit_get_by(
ov[b'origin'], ov[b'visit'])
_ov['date'] = str(_ov['date'])
ovs.append(_ov)
self.expected_origin_visits = ovs
def test_process_contents(self):
actual_contents = self.publisher.process_contents(self.contents)
expected_contents = [(c['sha1'], c) for c in CONTENTS]
self.assertEqual(actual_contents, expected_contents)
def test_process_revisions(self):
actual_revisions = self.publisher.process_revisions(self.revisions)
expected_revisions = [(c['id'], c) for c in REVISIONS]
self.assertEqual(actual_revisions, expected_revisions)
def test_process_releases(self):
actual_releases = self.publisher.process_releases(self.releases)
expected_releases = [(c['id'], c) for c in RELEASES]
self.assertEqual(actual_releases, expected_releases)
def test_process_origins(self):
actual_origins = self.publisher.process_origins(self.origins)
expected_origins = [({'url': o[b'url'], 'type': o[b'type']},
{'url': o[b'url'], 'type': o[b'type']})
for o in self.origins]
self.assertEqual(actual_origins, expected_origins)
def test_process_origin_visits(self):
actual_ovs = self.publisher.process_origin_visits(self.origin_visits)
expected_ovs = [((ov['origin'], ov['visit']), ov)
for ov in self.expected_origin_visits]
self.assertEqual(actual_ovs, expected_ovs)
def test_process_objects(self):
messages = {
'content': self.contents,
'revision': self.revisions,
'release': self.releases,
'origin': self.origins,
'origin_visit': self.origin_visits,
}
actual_objects = self.publisher.process_objects(messages)
expected_contents = [(c['sha1'], c) for c in CONTENTS]
expected_revisions = [(c['id'], c) for c in REVISIONS]
expected_releases = [(c['id'], c) for c in RELEASES]
expected_origins = [(o, o) for o in ORIGINS]
expected_ovs = [((ov['origin'], ov['visit']), ov)
for ov in self.expected_origin_visits]
expected_objects = {
'content': expected_contents,
'revision': expected_revisions,
'release': expected_releases,
'origin': expected_origins,
'origin_visit': expected_ovs,
}
self.assertEqual(actual_objects, expected_objects)

File Metadata

Mime Type
text/x-diff
Expires
Fri, Jul 4, 1:32 PM (6 d, 12 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3270880

Event Timeline