diff --git a/requirements-swh.txt b/requirements-swh.txt index 0b5f629..932ebfd 100644 --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,3 +1,3 @@ swh.core[db,http] >= 0.0.60 swh.model >= 0.0.60 -swh.storage >= 0.0.177 +swh.storage >= 0.0.178 diff --git a/swh/journal/replay.py b/swh/journal/replay.py index 37bf4fa..83a3227 100644 --- a/swh/journal/replay.py +++ b/swh/journal/replay.py @@ -1,571 +1,586 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import copy import logging from time import time from typing import ( Any, Callable, Dict, Iterable, List, Optional ) from sentry_sdk import capture_exception, push_scope try: from systemd.daemon import notify except ImportError: notify = None from tenacity import ( retry, retry_if_exception_type, stop_after_attempt, wait_random_exponential, ) from swh.core.statsd import statsd from swh.model.identifiers import normalize_timestamp from swh.model.hashutil import hash_to_hex from swh.model.model import ( - BaseContent, BaseModel, Content, Directory, Origin, Revision, + BaseContent, BaseModel, Content, Directory, Origin, OriginVisit, Revision, SHA1_SIZE, SkippedContent, Snapshot, Release ) from swh.objstorage.objstorage import ( ID_HASH_ALGO, ObjNotFoundError, ObjStorage, ) from swh.storage import HashCollision logger = logging.getLogger(__name__) GRAPH_OPERATIONS_METRIC = "swh_graph_replayer_operations_total" GRAPH_DURATION_METRIC = "swh_graph_replayer_duration_seconds" CONTENT_OPERATIONS_METRIC = "swh_content_replayer_operations_total" CONTENT_RETRY_METRIC = "swh_content_replayer_retries_total" CONTENT_BYTES_METRIC = "swh_content_replayer_bytes" CONTENT_DURATION_METRIC = "swh_content_replayer_duration_seconds" object_converter_fn: Dict[str, Callable[[Dict], BaseModel]] = { 'origin': Origin.from_dict, + 'origin_visit': OriginVisit.from_dict, 'snapshot': Snapshot.from_dict, 'revision': Revision.from_dict, 'release': Release.from_dict, 'directory': Directory.from_dict, 'content': Content.from_dict, 'skipped_content': SkippedContent.from_dict, } def process_replay_objects(all_objects, *, storage): for (object_type, objects) in all_objects.items(): logger.debug("Inserting %s %s objects", len(objects), object_type) with statsd.timed(GRAPH_DURATION_METRIC, tags={'object_type': object_type}): _insert_objects(object_type, objects, storage) statsd.increment(GRAPH_OPERATIONS_METRIC, len(objects), tags={'object_type': object_type}) if notify: notify('WATCHDOG=1') def _fix_revision_pypi_empty_string(rev): """PyPI loader failed to encode empty strings as bytes, see: swh:1:rev:8f0095ee0664867055d03de9bcc8f95b91d8a2b9 or https://forge.softwareheritage.org/D1772 """ rev = { **rev, 'author': rev['author'].copy(), 'committer': rev['committer'].copy(), } if rev['author'].get('email') == '': rev['author']['email'] = b'' if rev['author'].get('name') == '': rev['author']['name'] = b'' if rev['committer'].get('email') == '': rev['committer']['email'] = b'' if rev['committer'].get('name') == '': rev['committer']['name'] = b'' return rev def _fix_revision_transplant_source(rev): if rev.get('metadata') and rev['metadata'].get('extra_headers'): rev = copy.deepcopy(rev) rev['metadata']['extra_headers'] = [ [key, value.encode('ascii')] if key == 'transplant_source' and isinstance(value, str) else [key, value] for (key, value) in rev['metadata']['extra_headers']] return rev def _check_date(date): """Returns whether the date can be represented in backends with sane limits on timestamps and timezones (resp. signed 64-bits and signed 16 bits), and that microseconds is valid (ie. between 0 and 10^6). """ if date is None: return True date = normalize_timestamp(date) return (-2**63 <= date['timestamp']['seconds'] < 2**63) \ and (0 <= date['timestamp']['microseconds'] < 10**6) \ and (-2**15 <= date['offset'] < 2**15) def _check_revision_date(rev): """Exclude revisions with invalid dates. See https://forge.softwareheritage.org/T1339""" return _check_date(rev['date']) and _check_date(rev['committer_date']) def _fix_revisions(revisions: List[Dict]) -> List[Dict]: """Adapt revisions into a list of (current) storage compatible dicts. >>> from pprint import pprint >>> date = { ... 'timestamp': { ... 'seconds': 1565096932, ... 'microseconds': 0, ... }, ... 'offset': 0, ... } >>> pprint(_fix_revisions([{ ... 'author': {'email': '', 'fullname': b'', 'name': ''}, ... 'committer': {'email': '', 'fullname': b'', 'name': ''}, ... 'date': date, ... 'committer_date': date, ... }])) [{'author': {'email': b'', 'fullname': b'', 'name': b''}, 'committer': {'email': b'', 'fullname': b'', 'name': b''}, 'committer_date': {'offset': 0, 'timestamp': {'microseconds': 0, 'seconds': 1565096932}}, 'date': {'offset': 0, 'timestamp': {'microseconds': 0, 'seconds': 1565096932}}}] Fix type of 'transplant_source' extra headers: >>> revs = _fix_revisions([{ ... 'author': {'email': '', 'fullname': b'', 'name': ''}, ... 'committer': {'email': '', 'fullname': b'', 'name': ''}, ... 'date': date, ... 'committer_date': date, ... 'metadata': { ... 'extra_headers': [ ... ['time_offset_seconds', b'-3600'], ... ['transplant_source', '29c154a012a70f49df983625090434587622b39e'] # noqa ... ]} ... }]) >>> pprint(revs[0]['metadata']['extra_headers']) [['time_offset_seconds', b'-3600'], ['transplant_source', b'29c154a012a70f49df983625090434587622b39e']] Filter out revisions with invalid dates: >>> from copy import deepcopy >>> invalid_date1 = deepcopy(date) >>> invalid_date1['timestamp']['microseconds'] = 1000000000 # > 10^6 >>> _fix_revisions([{ ... 'author': {'email': '', 'fullname': b'', 'name': b''}, ... 'committer': {'email': '', 'fullname': b'', 'name': b''}, ... 'date': invalid_date1, ... 'committer_date': date, ... }]) [] >>> invalid_date2 = deepcopy(date) >>> invalid_date2['timestamp']['seconds'] = 2**70 # > 10^63 >>> _fix_revisions([{ ... 'author': {'email': '', 'fullname': b'', 'name': b''}, ... 'committer': {'email': '', 'fullname': b'', 'name': b''}, ... 'date': invalid_date2, ... 'committer_date': date, ... }]) [] >>> invalid_date3 = deepcopy(date) >>> invalid_date3['offset'] = 2**20 # > 10^15 >>> _fix_revisions([{ ... 'author': {'email': '', 'fullname': b'', 'name': b''}, ... 'committer': {'email': '', 'fullname': b'', 'name': b''}, ... 'date': date, ... 'committer_date': invalid_date3, ... }]) [] """ good_revisions: List = [] for rev in revisions: rev = _fix_revision_pypi_empty_string(rev) rev = _fix_revision_transplant_source(rev) if not _check_revision_date(rev): logging.warning('Excluding revision (invalid date): %r', rev) continue if rev not in good_revisions: good_revisions.append(rev) return good_revisions -def _fix_origin_visits(visits: List[Dict]) -> List[Dict]: - """Adapt origin visits into a list of current storage compatible dicts. +def _fix_origin_visit(visit: Dict) -> OriginVisit: + """Adapt origin visits into a list of current storage compatible + OriginVisits. `visit['origin']` is a dict instead of an URL: + >>> from datetime import datetime, timezone >>> from pprint import pprint - >>> pprint(_fix_origin_visits([{ + >>> date = datetime(2020, 2, 27, 14, 39, 19, tzinfo=timezone.utc) + >>> pprint(_fix_origin_visit({ ... 'origin': {'url': 'http://foo'}, + ... 'date': date, ... 'type': 'git', - ... }])) - [{'metadata': None, 'origin': 'http://foo', 'type': 'git'}] + ... 'status': 'ongoing', + ... 'snapshot': None, + ... }).to_dict()) + {'date': datetime.datetime(2020, 2, 27, 14, 39, 19, tzinfo=datetime.timezone.utc), + 'metadata': None, + 'origin': 'http://foo', + 'snapshot': None, + 'status': 'ongoing', + 'type': 'git'} `visit['type']` is missing , but `origin['visit']['type']` exists: - >>> pprint(_fix_origin_visits([ - ... {'origin': {'type': 'hg', 'url': 'http://foo'} - ... }])) - [{'metadata': None, 'origin': 'http://foo', 'type': 'hg'}] - - """ - good_visits = [] - for visit in visits: - visit = visit.copy() - if 'type' not in visit: - if isinstance(visit['origin'], dict) and 'type' in visit['origin']: - # Very old version of the schema: visits did not have a type, - # but their 'origin' field was a dict with a 'type' key. - visit['type'] = visit['origin']['type'] - else: - # Very very old version of the schema: 'type' is missing, - # so there is nothing we can do to fix it. - raise ValueError('Got an origin_visit too old to be replayed.') - if isinstance(visit['origin'], dict): - # Old version of the schema: visit['origin'] was a dict. - visit['origin'] = visit['origin']['url'] - if 'metadata' not in visit: - visit['metadata'] = None - good_visits.append(visit) - return good_visits + >>> pprint(_fix_origin_visit( + ... {'origin': {'type': 'hg', 'url': 'http://foo'}, + ... 'date': date, + ... 'status': 'ongoing', + ... 'snapshot': None, + ... }).to_dict()) + {'date': datetime.datetime(2020, 2, 27, 14, 39, 19, tzinfo=datetime.timezone.utc), + 'metadata': None, + 'origin': 'http://foo', + 'snapshot': None, + 'status': 'ongoing', + 'type': 'hg'} + + """ # noqa + visit = visit.copy() + if 'type' not in visit: + if isinstance(visit['origin'], dict) and 'type' in visit['origin']: + # Very old version of the schema: visits did not have a type, + # but their 'origin' field was a dict with a 'type' key. + visit['type'] = visit['origin']['type'] + else: + # Very very old version of the schema: 'type' is missing, + # so there is nothing we can do to fix it. + raise ValueError('Got an origin_visit too old to be replayed.') + if isinstance(visit['origin'], dict): + # Old version of the schema: visit['origin'] was a dict. + visit['origin'] = visit['origin']['url'] + if 'metadata' not in visit: + visit['metadata'] = None + return OriginVisit.from_dict(visit) def collision_aware_content_add( content_add_fn: Callable[[Iterable[Any]], None], contents: List[BaseContent]) -> None: """Add contents to storage. If a hash collision is detected, an error is logged. Then this adds the other non colliding contents to the storage. Args: content_add_fn: Storage content callable contents: List of contents or skipped contents to add to storage """ if not contents: return colliding_content_hashes: List[Dict[str, Any]] = [] while True: try: content_add_fn(contents) except HashCollision as e: algo, hash_id, colliding_hashes = e.args hash_id = hash_to_hex(hash_id) colliding_content_hashes.append({ 'algo': algo, 'hash': hash_to_hex(hash_id), 'objects': [{k: hash_to_hex(v) for k, v in collision.items()} for collision in colliding_hashes] }) # Drop the colliding contents from the transaction contents = [c for c in contents if c.hashes() not in colliding_hashes] else: # Successfully added contents, we are done break if colliding_content_hashes: for collision in colliding_content_hashes: logger.error('Collision detected: %(collision)s', { 'collision': collision }) def _insert_objects(object_type: str, objects: List[Dict], storage) -> None: """Insert objects of type object_type in the storage. """ if object_type == 'content': contents: List[BaseContent] = [] skipped_contents: List[BaseContent] = [] for content in objects: c = BaseContent.from_dict(content) if isinstance(c, SkippedContent): skipped_contents.append(c) else: contents.append(c) collision_aware_content_add( storage.skipped_content_add, skipped_contents) collision_aware_content_add( storage.content_add_metadata, contents) elif object_type == 'revision': storage.revision_add( Revision.from_dict(r) for r in _fix_revisions(objects) ) elif object_type == 'origin_visit': - visits = _fix_origin_visits(objects) - storage.origin_add(Origin(url=v['origin']) for v in visits) - # FIXME: Should be List[OriginVisit], working on fixing - # swh.storage.origin_visit_upsert (D2813) + visits = [_fix_origin_visit(v) for v in objects] + storage.origin_add(Origin(url=v.origin) for v in visits) storage.origin_visit_upsert(visits) elif object_type in ('directory', 'release', 'snapshot', 'origin'): method = getattr(storage, object_type + '_add') method(object_converter_fn[object_type](o) for o in objects) else: logger.warning('Received a series of %s, this should not happen', object_type) def is_hash_in_bytearray(hash_, array, nb_hashes, hash_size=SHA1_SIZE): """ Checks if the given hash is in the provided `array`. The array must be a *sorted* list of sha1 hashes, and contain `nb_hashes` hashes (so its size must by `nb_hashes*hash_size` bytes). Args: hash_ (bytes): the hash to look for array (bytes): a sorted concatenated array of hashes (may be of any type supporting slice indexing, eg. :class:`mmap.mmap`) nb_hashes (int): number of hashes in the array hash_size (int): size of a hash (defaults to 20, for SHA1) Example: >>> import os >>> hash1 = os.urandom(20) >>> hash2 = os.urandom(20) >>> hash3 = os.urandom(20) >>> array = b''.join(sorted([hash1, hash2])) >>> is_hash_in_bytearray(hash1, array, 2) True >>> is_hash_in_bytearray(hash2, array, 2) True >>> is_hash_in_bytearray(hash3, array, 2) False """ if len(hash_) != hash_size: raise ValueError('hash_ does not match the provided hash_size.') def get_hash(position): return array[position*hash_size:(position+1)*hash_size] # Regular dichotomy: left = 0 right = nb_hashes while left < right-1: middle = int((right+left)/2) pivot = get_hash(middle) if pivot == hash_: return True elif pivot < hash_: left = middle else: right = middle return get_hash(left) == hash_ class ReplayError(Exception): """An error occurred during the replay of an object""" def __init__(self, operation, *, obj_id, exc): self.operation = operation self.obj_id = hash_to_hex(obj_id) self.exc = exc def __str__(self): return "ReplayError(doing %s, %s, %s)" % ( self.operation, self.obj_id, self.exc ) def log_replay_retry(retry_obj, sleep, last_result): """Log a retry of the content replayer""" exc = last_result.exception() logger.debug('Retry operation %(operation)s on %(obj_id)s: %(exc)s', {'operation': exc.operation, 'obj_id': exc.obj_id, 'exc': str(exc.exc)}) statsd.increment(CONTENT_RETRY_METRIC, tags={ 'operation': exc.operation, 'attempt': str(retry_obj.statistics['attempt_number']), }) def log_replay_error(last_attempt): """Log a replay error to sentry""" exc = last_attempt.exception() with push_scope() as scope: scope.set_tag('operation', exc.operation) scope.set_extra('obj_id', exc.obj_id) capture_exception(exc.exc) logger.error( 'Failed operation %(operation)s on %(obj_id)s after %(retries)s' ' retries: %(exc)s', { 'obj_id': exc.obj_id, 'operation': exc.operation, 'exc': str(exc.exc), 'retries': last_attempt.attempt_number, }) return None CONTENT_REPLAY_RETRIES = 3 content_replay_retry = retry( retry=retry_if_exception_type(ReplayError), stop=stop_after_attempt(CONTENT_REPLAY_RETRIES), wait=wait_random_exponential(multiplier=1, max=60), before_sleep=log_replay_retry, retry_error_callback=log_replay_error, ) @content_replay_retry def copy_object(obj_id, src, dst): hex_obj_id = hash_to_hex(obj_id) obj = '' try: with statsd.timed(CONTENT_DURATION_METRIC, tags={'request': 'get'}): obj = src.get(obj_id) logger.debug('retrieved %(obj_id)s', {'obj_id': hex_obj_id}) with statsd.timed(CONTENT_DURATION_METRIC, tags={'request': 'put'}): dst.add(obj, obj_id=obj_id, check_presence=False) logger.debug('copied %(obj_id)s', {'obj_id': hex_obj_id}) statsd.increment(CONTENT_BYTES_METRIC, len(obj)) except ObjNotFoundError: logger.error('Failed to copy %(obj_id)s: object not found', {'obj_id': hex_obj_id}) raise except Exception as exc: raise ReplayError('copy', obj_id=obj_id, exc=exc) from None return len(obj) @content_replay_retry def obj_in_objstorage(obj_id, dst): """Check if an object is already in an objstorage, tenaciously""" try: return obj_id in dst except Exception as exc: raise ReplayError('in_dst', obj_id=obj_id, exc=exc) from None def process_replay_objects_content( all_objects: Dict[str, List[dict]], *, src: ObjStorage, dst: ObjStorage, exclude_fn: Optional[Callable[[dict], bool]] = None, check_dst: bool = True, ): """ Takes a list of records from Kafka (see :py:func:`swh.journal.client.JournalClient.process`) and copies them from the `src` objstorage to the `dst` objstorage, if: * `obj['status']` is `'visible'` * `exclude_fn(obj)` is `False` (if `exclude_fn` is provided) * `obj['sha1'] not in dst` (if `check_dst` is True) Args: all_objects: Objects passed by the Kafka client. Most importantly, `all_objects['content'][*]['sha1']` is the sha1 hash of each content. src: An object storage (see :py:func:`swh.objstorage.get_objstorage`) dst: An object storage (see :py:func:`swh.objstorage.get_objstorage`) exclude_fn: Determines whether an object should be copied. check_dst: Determines whether we should check the destination objstorage before copying. Example: >>> from swh.objstorage import get_objstorage >>> src = get_objstorage('memory', {}) >>> dst = get_objstorage('memory', {}) >>> id1 = src.add(b'foo bar') >>> id2 = src.add(b'baz qux') >>> kafka_partitions = { ... 'content': [ ... { ... 'sha1': id1, ... 'status': 'visible', ... }, ... { ... 'sha1': id2, ... 'status': 'visible', ... }, ... ] ... } >>> process_replay_objects_content( ... kafka_partitions, src=src, dst=dst, ... exclude_fn=lambda obj: obj['sha1'] == id1) >>> id1 in dst False >>> id2 in dst True """ vol = [] nb_skipped = 0 nb_failures = 0 t0 = time() for (object_type, objects) in all_objects.items(): if object_type != 'content': logger.warning( 'Received a series of %s, this should not happen', object_type) continue for obj in objects: obj_id = obj[ID_HASH_ALGO] if obj['status'] != 'visible': nb_skipped += 1 logger.debug('skipped %s (status=%s)', hash_to_hex(obj_id), obj['status']) statsd.increment(CONTENT_OPERATIONS_METRIC, tags={"decision": "skipped", "status": obj["status"]}) elif exclude_fn and exclude_fn(obj): nb_skipped += 1 logger.debug('skipped %s (manually excluded)', hash_to_hex(obj_id)) statsd.increment(CONTENT_OPERATIONS_METRIC, tags={"decision": "excluded"}) elif check_dst and obj_in_objstorage(obj_id, dst): nb_skipped += 1 logger.debug('skipped %s (in dst)', hash_to_hex(obj_id)) statsd.increment(CONTENT_OPERATIONS_METRIC, tags={"decision": "in_dst"}) else: try: copied = copy_object(obj_id, src, dst) except ObjNotFoundError: nb_skipped += 1 statsd.increment(CONTENT_OPERATIONS_METRIC, tags={"decision": "not_in_src"}) else: if copied is None: nb_failures += 1 statsd.increment(CONTENT_OPERATIONS_METRIC, tags={"decision": "failed"}) else: vol.append(copied) statsd.increment(CONTENT_OPERATIONS_METRIC, tags={"decision": "copied"}) dt = time() - t0 logger.info( 'processed %s content objects in %.1fsec ' '(%.1f obj/sec, %.1fMB/sec) - %d failed - %d skipped', len(vol), dt, len(vol)/dt, sum(vol)/1024/1024/dt, nb_failures, nb_skipped) if notify: notify('WATCHDOG=1') diff --git a/swh/journal/tests/test_kafka_writer.py b/swh/journal/tests/test_kafka_writer.py index 6a8e402..90861a2 100644 --- a/swh/journal/tests/test_kafka_writer.py +++ b/swh/journal/tests/test_kafka_writer.py @@ -1,162 +1,161 @@ # Copyright (C) 2018-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import defaultdict import datetime from confluent_kafka import Consumer, KafkaException from subprocess import Popen from typing import List, Tuple from swh.storage import get_storage from swh.journal.replay import object_converter_fn from swh.journal.serializers import ( kafka_to_key, kafka_to_value ) from swh.journal.writer.kafka import KafkaJournalWriter from swh.model.model import Content, Origin, BaseModel from .conftest import OBJECT_TYPE_KEYS def assert_written(consumer, kafka_prefix, expected_messages): consumed_objects = defaultdict(list) fetched_messages = 0 retries_left = 1000 while fetched_messages < expected_messages: if retries_left == 0: raise ValueError('Timed out fetching messages from kafka') msg = consumer.poll(timeout=0.01) if not msg: retries_left -= 1 continue error = msg.error() if error is not None: if error.fatal(): raise KafkaException(error) retries_left -= 1 continue fetched_messages += 1 consumed_objects[msg.topic()].append( (kafka_to_key(msg.key()), kafka_to_value(msg.value())) ) for (object_type, (key_name, objects)) in OBJECT_TYPE_KEYS.items(): topic = kafka_prefix + '.' + object_type (keys, values) = zip(*consumed_objects[topic]) if key_name: assert list(keys) == [object_[key_name] for object_ in objects] else: pass # TODO if object_type == 'origin_visit': for value in values: del value['visit'] elif object_type == 'content': for value in values: del value['ctime'] for object_ in objects: assert object_ in values def test_kafka_writer( kafka_prefix: str, kafka_server: Tuple[Popen, int], consumer: Consumer): kafka_prefix += '.swh.journal.objects' config = { 'brokers': ['localhost:%d' % kafka_server[1]], 'client_id': 'kafka_writer', 'prefix': kafka_prefix, 'producer_config': { 'message.max.bytes': 100000000, } } writer = KafkaJournalWriter(**config) expected_messages = 0 for (object_type, (_, objects)) in OBJECT_TYPE_KEYS.items(): for (num, object_) in enumerate(objects): if object_type == 'origin_visit': object_ = {**object_, 'visit': num} if object_type == 'content': object_ = {**object_, 'ctime': datetime.datetime.now()} writer.write_addition(object_type, object_) expected_messages += 1 assert_written(consumer, kafka_prefix, expected_messages) def test_storage_direct_writer( kafka_prefix: str, kafka_server: Tuple[Popen, int], consumer: Consumer): kafka_prefix += '.swh.journal.objects' writer_config = { 'cls': 'kafka', 'brokers': ['localhost:%d' % kafka_server[1]], 'client_id': 'kafka_writer', 'prefix': kafka_prefix, 'producer_config': { 'message.max.bytes': 100000000, } } storage_config = { 'cls': 'pipeline', 'steps': [ {'cls': 'memory', 'journal_writer': writer_config}, ] } storage = get_storage(**storage_config) expected_messages = 0 for (object_type, (_, objects)) in OBJECT_TYPE_KEYS.items(): method = getattr(storage, object_type + '_add') if object_type in ('content', 'directory', 'revision', 'release', 'snapshot', 'origin'): objects_: List[BaseModel] if object_type == 'content': objects_ = [ Content.from_dict({ **obj, 'data': b''}) for obj in objects ] else: objects_ = [ object_converter_fn[object_type](obj) for obj in objects ] method(objects_) expected_messages += len(objects) elif object_type in ('origin_visit',): for object_ in objects: object_ = object_.copy() origin_url = object_.pop('origin') storage.origin_add_one(Origin(url=origin_url)) - visit = method(origin=origin_url, date=object_.pop('date'), + visit = method(origin_url, date=object_.pop('date'), type=object_.pop('type')) expected_messages += 1 - visit_id = visit['visit'] - storage.origin_visit_update(origin_url, visit_id, **object_) + storage.origin_visit_update(origin_url, visit.visit, **object_) expected_messages += 1 else: assert False, object_type assert_written(consumer, kafka_prefix, expected_messages) diff --git a/swh/journal/tests/test_write_replay.py b/swh/journal/tests/test_write_replay.py index a2b06c4..d096da6 100644 --- a/swh/journal/tests/test_write_replay.py +++ b/swh/journal/tests/test_write_replay.py @@ -1,159 +1,161 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import functools from unittest.mock import patch import attr from hypothesis import given, settings, HealthCheck from hypothesis.strategies import lists -from swh.model.hypothesis_strategies import object_dicts, present_contents +from swh.model.hypothesis_strategies import ( + object_dicts, present_contents +) from swh.model.model import Origin from swh.storage import get_storage, HashCollision from swh.journal.replay import ( process_replay_objects, process_replay_objects_content, object_converter_fn ) from .utils import MockedJournalClient, MockedKafkaWriter storage_config = { - 'cls': 'pipeline', - 'steps': [ - {'cls': 'memory', 'journal_writer': {'cls': 'memory'}}, - ] + 'cls': 'memory', + 'journal_writer': {'cls': 'memory'}, } def empty_person_name_email(rev_or_rel): """Empties the 'name' and 'email' fields of the author/committer fields of a revision or release; leaving only the fullname.""" if getattr(rev_or_rel, 'author', None): rev_or_rel = attr.evolve( rev_or_rel, author=attr.evolve( rev_or_rel.author, name=b'', email=b'', ) ) if getattr(rev_or_rel, 'committer', None): rev_or_rel = attr.evolve( rev_or_rel, committer=attr.evolve( rev_or_rel.committer, name=b'', email=b'', ) ) return rev_or_rel @given(lists(object_dicts(), min_size=1)) @settings(suppress_health_check=[HealthCheck.too_slow]) def test_write_replay_same_order_batches(objects): queue = [] replayer = MockedJournalClient(queue) with patch('swh.journal.writer.inmemory.InMemoryJournalWriter', return_value=MockedKafkaWriter(queue)): storage1 = get_storage(**storage_config) # Write objects to storage1 for (obj_type, obj) in objects: - obj = obj.copy() + if obj_type == 'content' and obj.get('status') == 'absent': + obj_type = 'skipped_content' + + obj = object_converter_fn[obj_type](obj) + if obj_type == 'origin_visit': - storage1.origin_add_one(Origin(url=obj['origin'])) + storage1.origin_add_one(Origin(url=obj.origin)) storage1.origin_visit_upsert([obj]) else: - if obj_type == 'content' and obj.get('status') == 'absent': - obj_type = 'skipped_content' method = getattr(storage1, obj_type + '_add') try: - method([object_converter_fn[obj_type](obj)]) + method([obj]) except HashCollision: pass # Bail out early if we didn't insert any relevant objects... queue_size = len(queue) assert queue_size != 0, "No test objects found; hypothesis strategy bug?" assert replayer.stop_after_objects is None replayer.stop_after_objects = queue_size storage2 = get_storage(**storage_config) worker_fn = functools.partial(process_replay_objects, storage=storage2) replayer.process(worker_fn) assert replayer.consumer.committed for attr_name in ('_contents', '_directories', '_snapshots', '_origin_visits', '_origins'): assert getattr(storage1, attr_name) == getattr(storage2, attr_name), \ attr_name # When hypothesis generates a revision and a release with same # author (or committer) fullname but different name or email, then # the storage will use the first name/email it sees. # This first one will be either the one from the revision or the release, # and since there is no order guarantees, storage2 has 1/2 chance of # not seeing the same order as storage1, therefore we need to strip # them out before comparing. for attr_name in ('_revisions', '_releases'): items1 = {k: empty_person_name_email(v) for (k, v) in getattr(storage1, attr_name).items()} items2 = {k: empty_person_name_email(v) for (k, v) in getattr(storage2, attr_name).items()} assert items1 == items2, attr_name # TODO: add test for hash collision @given(lists(present_contents(), min_size=1)) @settings(suppress_health_check=[HealthCheck.too_slow]) def test_write_replay_content(objects): queue = [] replayer = MockedJournalClient(queue) with patch('swh.journal.writer.inmemory.InMemoryJournalWriter', return_value=MockedKafkaWriter(queue)): storage1 = get_storage(**storage_config) contents = [] for obj in objects: storage1.content_add([obj]) contents.append(obj) # Bail out early if we didn't insert any relevant objects... queue_size = len(queue) assert queue_size != 0, "No test objects found; hypothesis strategy bug?" assert replayer.stop_after_objects is None replayer.stop_after_objects = queue_size storage2 = get_storage(**storage_config) objstorage1 = storage1.objstorage.objstorage objstorage2 = storage2.objstorage.objstorage worker_fn = functools.partial(process_replay_objects_content, src=objstorage1, dst=objstorage2) replayer.process(worker_fn) # only content with status visible will be copied in storage2 expected_objstorage_state = { c.sha1: c.data for c in contents if c.status == 'visible' } assert expected_objstorage_state == objstorage2.state