Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/tests/test_publisher.py
Show First 20 Lines • Show All 61 Lines • ▼ Show 20 Lines | RELEASES = [ | ||||
'microseconds': 0, | 'microseconds': 0, | ||||
}, | }, | ||||
'offset': 120, | 'offset': 120, | ||||
'negative_utc': None, | 'negative_utc': None, | ||||
}, | }, | ||||
}, | }, | ||||
] | ] | ||||
ORIGINS = [ | |||||
{ | |||||
'url': 'https://somewhere.org/den/fox', | |||||
'type': 'git', | |||||
}, | |||||
{ | |||||
'url': 'https://overtherainbow.org/fox/den', | |||||
'type': 'svn', | |||||
} | |||||
] | |||||
class JournalPublisherTest(SWHJournalPublisher): | class JournalPublisherTest(SWHJournalPublisher): | ||||
def parse_config_file(self): | def parse_config_file(self): | ||||
return { | return { | ||||
'brokers': ['localhost'], | 'brokers': ['localhost'], | ||||
'temporary_prefix': 'swh.tmp_journal.new', | 'temporary_prefix': 'swh.tmp_journal.new', | ||||
'final_prefix': 'swh.journal.objects', | 'final_prefix': 'swh.journal.objects', | ||||
'consumer_id': 'swh.journal.test.publisher', | 'consumer_id': 'swh.journal.test.publisher', | ||||
'publisher_id': 'swh.journal.test.publisher', | 'publisher_id': 'swh.journal.test.publisher', | ||||
'object_types': ['content'], | 'object_types': ['content'], | ||||
'max_messages': 3, | 'max_messages': 3, | ||||
} | } | ||||
def _prepare_storage(self, config): | def _prepare_storage(self, config): | ||||
self.storage = Storage() | self.storage = Storage() | ||||
self.storage.content_add({'data': b'42', **c} for c in CONTENTS) | self.storage.content_add({'data': b'42', **c} for c in CONTENTS) | ||||
self.storage.revision_add(REVISIONS) | self.storage.revision_add(REVISIONS) | ||||
self.storage.release_add(RELEASES) | self.storage.release_add(RELEASES) | ||||
self.storage.origin_add(ORIGINS) | |||||
def _prepare_journal(self, config): | def _prepare_journal(self, config): | ||||
"""No journal for now | """No journal for now | ||||
""" | """ | ||||
pass | pass | ||||
class TestPublisher(unittest.TestCase): | class TestPublisher(unittest.TestCase): | ||||
def setUp(self): | def setUp(self): | ||||
self.publisher = JournalPublisherTest() | self.publisher = JournalPublisherTest() | ||||
self.contents = [{b'sha1': c['sha1']} for c in CONTENTS] | self.contents = [{b'sha1': c['sha1']} for c in CONTENTS] | ||||
self.revisions = [{b'id': c['id']} for c in REVISIONS] | self.revisions = [{b'id': c['id']} for c in REVISIONS] | ||||
self.releases = [{b'id': c['id']} for c in RELEASES] | self.releases = [{b'id': c['id']} for c in RELEASES] | ||||
self.origins = ORIGINS | |||||
def test_process_contents(self): | def test_process_contents(self): | ||||
actual_contents = self.publisher.process_contents(self.contents) | actual_contents = self.publisher.process_contents(self.contents) | ||||
expected_contents = [(c['sha1'], c) for c in CONTENTS] | expected_contents = [(c['sha1'], c) for c in CONTENTS] | ||||
self.assertEqual(actual_contents, expected_contents) | self.assertEqual(actual_contents, expected_contents) | ||||
def test_process_revisions(self): | def test_process_revisions(self): | ||||
actual_revisions = self.publisher.process_revisions(self.revisions) | actual_revisions = self.publisher.process_revisions(self.revisions) | ||||
expected_revisions = [(c['id'], c) for c in REVISIONS] | expected_revisions = [(c['id'], c) for c in REVISIONS] | ||||
self.assertEqual(actual_revisions, expected_revisions) | self.assertEqual(actual_revisions, expected_revisions) | ||||
def test_process_releases(self): | def test_process_releases(self): | ||||
actual_releases = self.publisher.process_releases(self.releases) | actual_releases = self.publisher.process_releases(self.releases) | ||||
expected_releases = [(c['id'], c) for c in RELEASES] | expected_releases = [(c['id'], c) for c in RELEASES] | ||||
self.assertEqual(actual_releases, expected_releases) | self.assertEqual(actual_releases, expected_releases) | ||||
def test_process_origins(self): | |||||
actual_releases = self.publisher.process_origins(self.origins) | |||||
expected_releases = self.origins | |||||
self.assertEqual(actual_releases, expected_releases) | |||||
def test_process_objects(self): | def test_process_objects(self): | ||||
messages = { | messages = { | ||||
'content': self.contents, | 'content': self.contents, | ||||
'revision': self.revisions, | 'revision': self.revisions, | ||||
'release': self.releases, | 'release': self.releases, | ||||
'origin': self.origins, | |||||
} | } | ||||
actual_objects = self.publisher.process_objects(messages) | actual_objects = self.publisher.process_objects(messages) | ||||
expected_contents = [(c['sha1'], c) for c in CONTENTS] | expected_contents = [(c['sha1'], c) for c in CONTENTS] | ||||
expected_revisions = [(c['id'], c) for c in REVISIONS] | expected_revisions = [(c['id'], c) for c in REVISIONS] | ||||
expected_releases = [(c['id'], c) for c in RELEASES] | expected_releases = [(c['id'], c) for c in RELEASES] | ||||
expected_origins = ORIGINS | |||||
expected_objects = { | expected_objects = { | ||||
'content': expected_contents, | 'content': expected_contents, | ||||
'revision': expected_revisions, | 'revision': expected_revisions, | ||||
'release': expected_releases, | 'release': expected_releases, | ||||
'origin': expected_origins, | |||||
} | } | ||||
self.assertEqual(actual_objects, expected_objects) | self.assertEqual(actual_objects, expected_objects) |