diff --git a/requirements-swh.txt b/requirements-swh.txt --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,3 +1,3 @@ swh.core[db,http] >= 0.0.60 swh.model >= 0.0.40 -swh.storage >= 0.0.147 +swh.storage >= 0.0.156 diff --git a/swh/journal/cli.py b/swh/journal/cli.py --- a/swh/journal/cli.py +++ b/swh/journal/cli.py @@ -93,7 +93,8 @@ ctx.fail('You must have a storage configured in your config file.') client = get_journal_client( - ctx, brokers=brokers, prefix=prefix, group_id=group_id) + ctx, brokers=brokers, prefix=prefix, group_id=group_id, + max_messages=max_messages) worker_fn = functools.partial(process_replay_objects, storage=storage) try: @@ -109,6 +110,8 @@ ctx.exit(0) else: print('Done.') + finally: + client.close() @cli.command() @@ -207,7 +210,7 @@ client = get_journal_client( ctx, brokers=brokers, prefix=prefix, group_id=group_id, - object_types=('content',)) + max_messages=max_messages, object_types=('content',)) worker_fn = functools.partial(process_replay_objects_content, src=objstorage_src, dst=objstorage_dst, diff --git a/swh/journal/client.py b/swh/journal/client.py --- a/swh/journal/client.py +++ b/swh/journal/client.py @@ -141,31 +141,41 @@ timeout = self.process_timeout - elapsed - message = self.consumer.poll(timeout=timeout) - if not message: - continue + num_messages = 20 + + if self.max_messages: + if nb_messages >= self.max_messages: + break + num_messages = min(num_messages, self.max_messages-nb_messages) - error = message.error() - if error is not None: - if error.fatal(): - raise KafkaException(error) - logger.info('Received non-fatal kafka error: %s', error) + messages = self.consumer.consume( + timeout=timeout, num_messages=num_messages) + if not messages: continue - nb_messages += 1 + for message in messages: + error = message.error() + if error is not None: + if error.fatal(): + raise KafkaException(error) + logger.info('Received non-fatal kafka error: %s', error) + continue - object_type = message.topic().split('.')[-1] - # Got a message from a topic we did not subscribe to. - assert object_type in self._object_types, object_type + nb_messages += 1 - objects[object_type].append( - self.value_deserializer(message.value()) - ) + object_type = message.topic().split('.')[-1] + # Got a message from a topic we did not subscribe to. + assert object_type in self._object_types, object_type - if nb_messages >= self.max_messages: - break + objects[object_type].append( + self.value_deserializer(message.value()) + ) - worker_fn(dict(objects)) + if nb_messages: + worker_fn(dict(objects)) - self.consumer.commit() + self.consumer.commit() return nb_messages + + def close(self): + self.consumer.close() diff --git a/swh/journal/replay.py b/swh/journal/replay.py --- a/swh/journal/replay.py +++ b/swh/journal/replay.py @@ -89,16 +89,18 @@ good_visits = [] for visit in visits: visit = visit.copy() - if isinstance(visit['origin'], str): - # note that it will crash with the pg and - # in-mem storages if the origin is not already known, - # but there is no other choice because we can't add an - # origin without knowing its type. Non-pg storages - # don't use a numeric FK internally, - visit['origin'] = {'url': visit['origin']} - else: - if 'type' not in visit: + if 'type' not in visit: + if isinstance(visit['origin'], dict) and 'type' in visit['origin']: + # Very old version of the schema: visits did not have a type, + # but their 'origin' field was a dict with a 'type' key. visit['type'] = visit['origin']['type'] + else: + # Very very old version of the schema: 'type' is missing, + # so there is nothing we can do to fix it. + raise ValueError('Got an origin_visit too old to be replayed.') + if isinstance(visit['origin'], dict): + # Old version of the schema: visit['origin'] was a dict. + visit['origin'] = visit['origin']['url'] good_visits.append(visit) return good_visits @@ -183,17 +185,20 @@ [] - `visit['origin']` is an URL instead of a dict: + `visit['origin']` is a dict instead of an URL: - >>> fix_objects('origin_visit', [{'origin': 'http://foo'}]) - [{'origin': {'url': 'http://foo'}}] + >>> pprint(fix_objects('origin_visit', [{ + ... 'origin': {'url': 'http://foo'}, + ... 'type': 'git', + ... }])) + [{'origin': 'http://foo', 'type': 'git'}] `visit['type']` is missing , but `origin['visit']['type']` exists: - >>> pprint(fix_objects( - ... 'origin_visit', - ... [{'origin': {'type': 'hg', 'url': 'http://foo'}}])) - [{'origin': {'type': 'hg', 'url': 'http://foo'}, 'type': 'hg'}] + >>> pprint(fix_objects('origin_visit', [ + ... {'origin': {'type': 'hg', 'url': 'http://foo'} + ... }])) + [{'origin': 'http://foo', 'type': 'hg'}] """ # noqa if object_type == 'revision': @@ -220,7 +225,7 @@ method(objects) elif object_type == 'origin_visit': for visit in objects: - storage.origin_add_one(visit['origin']) + storage.origin_add_one({'url': visit['origin']}) if 'metadata' not in visit: visit['metadata'] = None storage.origin_visit_upsert(objects) diff --git a/swh/journal/tests/conftest.py b/swh/journal/tests/conftest.py --- a/swh/journal/tests/conftest.py +++ b/swh/journal/tests/conftest.py @@ -128,7 +128,7 @@ ORIGIN_VISITS = [ { - 'origin': ORIGINS[0], + 'origin': ORIGINS[0]['url'], 'date': '2013-05-07 04:20:39.369271+00:00', 'snapshot': None, # TODO 'status': 'ongoing', # TODO @@ -136,7 +136,7 @@ 'type': 'git', }, { - 'origin': ORIGINS[0], + 'origin': ORIGINS[0]['url'], 'date': '2018-11-27 17:20:39+00:00', 'snapshot': None, # TODO 'status': 'ongoing', # TODO diff --git a/swh/journal/tests/test_kafka_writer.py b/swh/journal/tests/test_kafka_writer.py --- a/swh/journal/tests/test_kafka_writer.py +++ b/swh/journal/tests/test_kafka_writer.py @@ -123,12 +123,13 @@ elif object_type in ('origin_visit',): for object_ in objects: object_ = object_.copy() - origin_id = storage.origin_add_one(object_.pop('origin')) - visit = method(origin=origin_id, date=object_.pop('date'), + origin_url = object_.pop('origin') + storage.origin_add_one({'url': origin_url}) + visit = method(origin=origin_url, date=object_.pop('date'), type=object_.pop('type')) expected_messages += 1 visit_id = visit['visit'] - storage.origin_visit_update(origin_id, visit_id, **object_) + storage.origin_visit_update(origin_url, visit_id, **object_) expected_messages += 1 else: assert False, object_type diff --git a/swh/journal/tests/test_replay.py b/swh/journal/tests/test_replay.py --- a/swh/journal/tests/test_replay.py +++ b/swh/journal/tests/test_replay.py @@ -12,9 +12,9 @@ import dateutil from confluent_kafka import Producer from hypothesis import strategies, given, settings +import pytest from swh.storage import get_storage -from swh.storage.in_memory import ENABLE_ORIGIN_IDS from swh.journal.client import JournalClient from swh.journal.serializers import key_to_kafka, value_to_kafka @@ -88,19 +88,18 @@ assert OBJECT_TYPE_KEYS['origin'][1] == \ [{'url': orig['url']} for orig in origins] for origin in origins: - origin_id_or_url = \ - origin['id'] if ENABLE_ORIGIN_IDS else origin['url'] + origin_url = origin['url'] expected_visits = [ { **visit, - 'origin': origin_id_or_url, + 'origin': origin_url, 'date': dateutil.parser.parse(visit['date']), } for visit in OBJECT_TYPE_KEYS['origin_visit'][1] - if visit['origin']['url'] == origin['url'] + if visit['origin'] == origin['url'] ] actual_visits = list(storage.origin_visit_get( - origin_id_or_url)) + origin_url)) for visit in actual_visits: del visit['visit'] # opaque identifier assert expected_visits == actual_visits @@ -137,6 +136,8 @@ writer.send('origin_visit', 'foo', visit) queue_size = len(queue) + assert replayer.max_messages == 0 + replayer.max_messages = queue_size storage = get_storage('memory', {}) worker_fn = functools.partial(process_replay_objects, storage=storage) @@ -151,17 +152,14 @@ for vin, vout in zip(visits, actual_visits): vin = vin.copy() vout = vout.copy() - if ENABLE_ORIGIN_IDS: - assert vout.pop('origin') == 1 - else: - assert vout.pop('origin') == 'http://example.com/' + assert vout.pop('origin') == 'http://example.com/' vin.pop('origin') vin.setdefault('type', 'git') vin.setdefault('metadata', None) assert vin == vout -def test_write_replay_legacy_origin_visit1(): +def test_write_replay_origin_visit(): """Test origin_visit when the 'origin' is just a string.""" now = datetime.datetime.now() visits = [{ @@ -175,8 +173,23 @@ _test_write_replay_origin_visit(visits) +def test_write_replay_legacy_origin_visit1(): + """Test origin_visit when there is no type.""" + now = datetime.datetime.now() + visits = [{ + 'visit': 1, + 'origin': 'http://example.com/', + 'date': now, + 'status': 'partial', + 'snapshot': None, + }] + with pytest.raises(ValueError, match='too old'): + _test_write_replay_origin_visit(visits) + + def test_write_replay_legacy_origin_visit2(): - """Test origin_visit when 'type' is missing.""" + """Test origin_visit when 'type' is missing from the visit, but not + from the origin.""" now = datetime.datetime.now() visits = [{ 'visit': 1, @@ -192,6 +205,22 @@ _test_write_replay_origin_visit(visits) +def test_write_replay_legacy_origin_visit3(): + """Test origin_visit when the origin is a dict""" + now = datetime.datetime.now() + visits = [{ + 'visit': 1, + 'origin': { + 'url': 'http://example.com/', + }, + 'date': now, + 'type': 'git', + 'status': 'partial', + 'snapshot': None, + }] + _test_write_replay_origin_visit(visits) + + hash_strategy = strategies.binary(min_size=20, max_size=20) diff --git a/swh/journal/tests/test_write_replay.py b/swh/journal/tests/test_write_replay.py --- a/swh/journal/tests/test_write_replay.py +++ b/swh/journal/tests/test_write_replay.py @@ -5,6 +5,7 @@ import functools +import attr from hypothesis import given, settings, HealthCheck from hypothesis.strategies import lists @@ -18,6 +19,32 @@ from .utils import MockedJournalClient, MockedKafkaWriter +def empty_person_name_email(rev_or_rel): + """Empties the 'name' and 'email' fields of the author/committer fields + of a revision or release; leaving only the fullname.""" + if getattr(rev_or_rel, 'author', None): + rev_or_rel = attr.evolve( + rev_or_rel, + author=attr.evolve( + rev_or_rel.author, + name=b'', + email=b'', + ) + ) + + if getattr(rev_or_rel, 'committer', None): + rev_or_rel = attr.evolve( + rev_or_rel, + committer=attr.evolve( + rev_or_rel.committer, + name=b'', + email=b'', + ) + ) + + return rev_or_rel + + @given(lists(object_dicts(), min_size=1)) @settings(suppress_health_check=[HealthCheck.too_slow]) def test_write_replay_same_order_batches(objects): @@ -30,7 +57,7 @@ for (obj_type, obj) in objects: obj = obj.copy() if obj_type == 'origin_visit': - storage1.origin_add_one(obj['origin']) + storage1.origin_add_one({'url': obj['origin']}) storage1.origin_visit_upsert([obj]) else: method = getattr(storage1, obj_type + '_add') @@ -40,6 +67,8 @@ pass queue_size = len(queue) + assert replayer.max_messages == 0 + replayer.max_messages = queue_size storage2 = Storage() worker_fn = functools.partial(process_replay_objects, storage=storage2) @@ -49,11 +78,25 @@ assert replayer.consumer.committed - for attr_name in ('_contents', '_directories', '_revisions', '_releases', + for attr_name in ('_contents', '_directories', '_snapshots', '_origin_visits', '_origins'): assert getattr(storage1, attr_name) == getattr(storage2, attr_name), \ attr_name + # When hypothesis generates a revision and a release with same + # author (or committer) fullname but different name or email, then + # the storage will use the first name/email it sees. + # This first one will be either the one from the revision or the release, + # and since there is no order guarantees, storage2 has 1/2 chance of + # not seeing the same order as storage1, therefore we need to strip + # them out before comparing. + for attr_name in ('_revisions', '_releases'): + items1 = {k: empty_person_name_email(v) + for (k, v) in getattr(storage1, attr_name).items()} + items2 = {k: empty_person_name_email(v) + for (k, v) in getattr(storage2, attr_name).items()} + assert items1 == items2, attr_name + # TODO: add test for hash collision @@ -78,6 +121,8 @@ contents.append(obj) queue_size = len(queue) + assert replayer.max_messages == 0 + replayer.max_messages = queue_size storage2 = Storage() worker_fn = functools.partial(process_replay_objects_content, diff --git a/swh/journal/tests/utils.py b/swh/journal/tests/utils.py --- a/swh/journal/tests/utils.py +++ b/swh/journal/tests/utils.py @@ -47,8 +47,10 @@ self.queue = queue self.committed = False - def poll(self, timeout=None): - return self.queue.pop(0) + def consume(self, num_messages, timeout=None): + L = self.queue[0:num_messages] + self.queue[0:num_messages] = [] + return L def commit(self): if self.queue == []: diff --git a/tox.ini b/tox.ini --- a/tox.ini +++ b/tox.ini @@ -1,5 +1,5 @@ [tox] -envlist=flake8,mypy,py3-no-origin-ids,py3 +envlist=flake8,mypy,py3 [testenv:py3] passenv=SWH_KAFKA_ROOT @@ -14,19 +14,6 @@ --cov-branch \ --doctest-modules {posargs} -[testenv:py3-no-origin-ids] -passenv=SWH_KAFKA_ROOT -deps = - .[testing] - pytest-cov -setenv = - SWH_STORAGE_IN_MEMORY_ENABLE_ORIGIN_IDS=false - SWH_KAFKA_ROOT = {env:SWH_KAFKA_ROOT:swh/journal/tests/kafka} -commands = - pytest --cov={envsitepackagesdir}/swh/journal \ - {envsitepackagesdir}/swh/journal \ - --cov-branch --doctest-modules {posargs} - [testenv:flake8] skip_install = true deps =