diff --git a/PKG-INFO b/PKG-INFO index a7b272c..4c7cc2c 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,70 +1,70 @@ Metadata-Version: 2.1 Name: swh.journal -Version: 0.0.27 +Version: 0.0.28 Summary: Software Heritage Journal utilities Home-page: https://forge.softwareheritage.org/diffusion/DJNL/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh-journal Description: swh-journal =========== Persistent logger of changes to the archive, with publish-subscribe support. See the [documentation](https://docs.softwareheritage.org/devel/swh-journal/index.html#software-heritage-journal) for more details. # Local test As a pre-requisite, you need a kakfa installation path. The following target will take care of this: ``` make install ``` Then, provided you are in the right virtual environment as described in the [swh getting-started](https://docs.softwareheritage.org/devel/developer-setup.html#developer-setup): ``` pytest ``` or: ``` tox ``` # Running ## publisher Command: ``` $ swh-journal --config-file ~/.config/swh/journal/publisher.yml \ publisher ``` # Auto-completion To have the completion, add the following in your ~/.virtualenvs/swh/bin/postactivate: ``` eval "$(_SWH_JOURNAL_COMPLETE=$autocomplete_cmd swh-journal)" ``` Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Description-Content-Type: text/markdown Provides-Extra: testing diff --git a/requirements-swh.txt b/requirements-swh.txt index 932ebfd..e172a16 100644 --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,3 +1,3 @@ swh.core[db,http] >= 0.0.60 swh.model >= 0.0.60 -swh.storage >= 0.0.178 +swh.storage >= 0.0.181 diff --git a/swh.journal.egg-info/PKG-INFO b/swh.journal.egg-info/PKG-INFO index a7b272c..4c7cc2c 100644 --- a/swh.journal.egg-info/PKG-INFO +++ b/swh.journal.egg-info/PKG-INFO @@ -1,70 +1,70 @@ Metadata-Version: 2.1 Name: swh.journal -Version: 0.0.27 +Version: 0.0.28 Summary: Software Heritage Journal utilities Home-page: https://forge.softwareheritage.org/diffusion/DJNL/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh-journal Description: swh-journal =========== Persistent logger of changes to the archive, with publish-subscribe support. See the [documentation](https://docs.softwareheritage.org/devel/swh-journal/index.html#software-heritage-journal) for more details. # Local test As a pre-requisite, you need a kakfa installation path. The following target will take care of this: ``` make install ``` Then, provided you are in the right virtual environment as described in the [swh getting-started](https://docs.softwareheritage.org/devel/developer-setup.html#developer-setup): ``` pytest ``` or: ``` tox ``` # Running ## publisher Command: ``` $ swh-journal --config-file ~/.config/swh/journal/publisher.yml \ publisher ``` # Auto-completion To have the completion, add the following in your ~/.virtualenvs/swh/bin/postactivate: ``` eval "$(_SWH_JOURNAL_COMPLETE=$autocomplete_cmd swh-journal)" ``` Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Description-Content-Type: text/markdown Provides-Extra: testing diff --git a/swh.journal.egg-info/SOURCES.txt b/swh.journal.egg-info/SOURCES.txt index ac59be6..ed5ca73 100644 --- a/swh.journal.egg-info/SOURCES.txt +++ b/swh.journal.egg-info/SOURCES.txt @@ -1,37 +1,38 @@ MANIFEST.in Makefile README.md requirements-swh.txt requirements-test.txt requirements.txt setup.py version.txt swh/__init__.py swh.journal.egg-info/PKG-INFO swh.journal.egg-info/SOURCES.txt swh.journal.egg-info/dependency_links.txt swh.journal.egg-info/entry_points.txt swh.journal.egg-info/requires.txt swh.journal.egg-info/top_level.txt swh/journal/__init__.py swh/journal/backfill.py swh/journal/cli.py swh/journal/client.py swh/journal/direct_writer.py +swh/journal/fixer.py swh/journal/py.typed swh/journal/replay.py swh/journal/serializers.py swh/journal/tests/__init__.py swh/journal/tests/conftest.py swh/journal/tests/log4j.properties swh/journal/tests/test_backfill.py swh/journal/tests/test_cli.py swh/journal/tests/test_client.py swh/journal/tests/test_kafka_writer.py swh/journal/tests/test_replay.py swh/journal/tests/test_serializers.py swh/journal/tests/test_write_replay.py swh/journal/tests/utils.py swh/journal/writer/__init__.py swh/journal/writer/inmemory.py swh/journal/writer/kafka.py \ No newline at end of file diff --git a/swh.journal.egg-info/requires.txt b/swh.journal.egg-info/requires.txt index d70491f..4e69043 100644 --- a/swh.journal.egg-info/requires.txt +++ b/swh.journal.egg-info/requires.txt @@ -1,13 +1,13 @@ confluent-kafka msgpack tenacity vcversioner swh.core[db,http]>=0.0.60 swh.model>=0.0.60 -swh.storage>=0.0.178 +swh.storage>=0.0.181 [testing] pytest swh.model>=0.0.34 pytest-kafka hypothesis diff --git a/swh/journal/fixer.py b/swh/journal/fixer.py new file mode 100644 index 0000000..f3f5797 --- /dev/null +++ b/swh/journal/fixer.py @@ -0,0 +1,268 @@ +import copy +import logging +from typing import Any, Dict, List, Optional +from swh.model.identifiers import normalize_timestamp + +logger = logging.getLogger(__name__) + + +def _fix_content(content: Dict[str, Any]) -> Dict[str, Any]: + """Filters-out invalid 'perms' key that leaked from swh.model.from_disk + to the journal. + + >>> _fix_content({'perms': 0o100644, 'sha1_git': b'foo'}) + {'sha1_git': b'foo'} + + >>> _fix_content({'sha1_git': b'bar'}) + {'sha1_git': b'bar'} + + """ + content = content.copy() + content.pop('perms', None) + return content + + +def _fix_revision_pypi_empty_string(rev): + """PyPI loader failed to encode empty strings as bytes, see: + swh:1:rev:8f0095ee0664867055d03de9bcc8f95b91d8a2b9 + or https://forge.softwareheritage.org/D1772 + """ + rev = { + **rev, + 'author': rev['author'].copy(), + 'committer': rev['committer'].copy(), + } + if rev['author'].get('email') == '': + rev['author']['email'] = b'' + if rev['author'].get('name') == '': + rev['author']['name'] = b'' + if rev['committer'].get('email') == '': + rev['committer']['email'] = b'' + if rev['committer'].get('name') == '': + rev['committer']['name'] = b'' + return rev + + +def _fix_revision_transplant_source(rev): + if rev.get('metadata') and rev['metadata'].get('extra_headers'): + rev = copy.deepcopy(rev) + rev['metadata']['extra_headers'] = [ + [key, value.encode('ascii')] + if key == 'transplant_source' and isinstance(value, str) + else [key, value] + for (key, value) in rev['metadata']['extra_headers']] + return rev + + +def _check_date(date): + """Returns whether the date can be represented in backends with sane + limits on timestamps and timezones (resp. signed 64-bits and + signed 16 bits), and that microseconds is valid (ie. between 0 and 10^6). + """ + if date is None: + return True + date = normalize_timestamp(date) + return (-2**63 <= date['timestamp']['seconds'] < 2**63) \ + and (0 <= date['timestamp']['microseconds'] < 10**6) \ + and (-2**15 <= date['offset'] < 2**15) + + +def _check_revision_date(rev): + """Exclude revisions with invalid dates. + See https://forge.softwareheritage.org/T1339""" + return _check_date(rev['date']) and _check_date(rev['committer_date']) + + +def _fix_revision(revision: Dict[str, Any]) -> Optional[Dict]: + """Fix various legacy revision issues. + + Fix author/committer person: + + >>> from pprint import pprint + >>> date = { + ... 'timestamp': { + ... 'seconds': 1565096932, + ... 'microseconds': 0, + ... }, + ... 'offset': 0, + ... } + >>> rev0 = _fix_revision({ + ... 'id': b'rev-id', + ... 'author': {'fullname': b'', 'name': '', 'email': ''}, + ... 'committer': {'fullname': b'', 'name': '', 'email': ''}, + ... 'date': date, + ... 'committer_date': date, + ... 'type': 'git', + ... 'message': '', + ... 'directory': b'dir-id', + ... 'synthetic': False, + ... }) + >>> rev0['author'] + {'fullname': b'', 'name': b'', 'email': b''} + >>> rev0['committer'] + {'fullname': b'', 'name': b'', 'email': b''} + + Fix type of 'transplant_source' extra headers: + + >>> rev1 = _fix_revision({ + ... 'id': b'rev-id', + ... 'author': {'fullname': b'', 'name': '', 'email': ''}, + ... 'committer': {'fullname': b'', 'name': '', 'email': ''}, + ... 'date': date, + ... 'committer_date': date, + ... 'metadata': { + ... 'extra_headers': [ + ... ['time_offset_seconds', b'-3600'], + ... ['transplant_source', '29c154a012a70f49df983625090434587622b39e'] + ... ]}, + ... 'type': 'git', + ... 'message': '', + ... 'directory': b'dir-id', + ... 'synthetic': False, + ... }) + >>> pprint(rev1['metadata']['extra_headers']) + [['time_offset_seconds', b'-3600'], + ['transplant_source', b'29c154a012a70f49df983625090434587622b39e']] + + Revision with invalid date are filtered: + + >>> from copy import deepcopy + >>> invalid_date1 = deepcopy(date) + >>> invalid_date1['timestamp']['microseconds'] = 1000000000 # > 10^6 + >>> rev = _fix_revision({ + ... 'author': {'fullname': b'', 'name': '', 'email': ''}, + ... 'committer': {'fullname': b'', 'name': '', 'email': ''}, + ... 'date': invalid_date1, + ... 'committer_date': date, + ... }) + >>> rev is None + True + + >>> invalid_date2 = deepcopy(date) + >>> invalid_date2['timestamp']['seconds'] = 2**70 # > 10^63 + >>> rev = _fix_revision({ + ... 'author': {'fullname': b'', 'name': '', 'email': ''}, + ... 'committer': {'fullname': b'', 'name': '', 'email': ''}, + ... 'date': invalid_date2, + ... 'committer_date': date, + ... }) + >>> rev is None + True + + >>> invalid_date3 = deepcopy(date) + >>> invalid_date3['offset'] = 2**20 # > 10^15 + >>> rev = _fix_revision({ + ... 'author': {'fullname': b'', 'name': '', 'email': ''}, + ... 'committer': {'fullname': b'', 'name': '', 'email': ''}, + ... 'date': date, + ... 'committer_date': invalid_date3, + ... }) + >>> rev is None + True + + """ # noqa + rev = _fix_revision_pypi_empty_string(revision) + rev = _fix_revision_transplant_source(rev) + if not _check_revision_date(rev): + logger.warning('Invalid revision date detected: %(revision)s', { + 'revision': rev + }) + return None + return rev + + +def _fix_origin_visit(visit: Dict) -> Dict: + """Fix various legacy origin visit issues. + + `visit['origin']` is a dict instead of an URL: + + >>> from datetime import datetime, timezone + >>> from pprint import pprint + >>> date = datetime(2020, 2, 27, 14, 39, 19, tzinfo=timezone.utc) + >>> pprint(_fix_origin_visit({ + ... 'origin': {'url': 'http://foo'}, + ... 'date': date, + ... 'type': 'git', + ... 'status': 'ongoing', + ... 'snapshot': None, + ... })) + {'date': datetime.datetime(2020, 2, 27, 14, 39, 19, tzinfo=datetime.timezone.utc), + 'metadata': None, + 'origin': 'http://foo', + 'snapshot': None, + 'status': 'ongoing', + 'type': 'git'} + + `visit['type']` is missing , but `origin['visit']['type']` exists: + + >>> pprint(_fix_origin_visit( + ... {'origin': {'type': 'hg', 'url': 'http://foo'}, + ... 'date': date, + ... 'status': 'ongoing', + ... 'snapshot': None, + ... })) + {'date': datetime.datetime(2020, 2, 27, 14, 39, 19, tzinfo=datetime.timezone.utc), + 'metadata': None, + 'origin': 'http://foo', + 'snapshot': None, + 'status': 'ongoing', + 'type': 'hg'} + + Old visit format (origin_visit with no type) raises: + + >>> _fix_origin_visit({ + ... 'origin': {'url': 'http://foo'}, + ... 'date': date, + ... 'status': 'ongoing', + ... 'snapshot': None + ... }) + Traceback (most recent call last): + ... + ValueError: Old origin visit format detected... + + >>> _fix_origin_visit({ + ... 'origin': 'http://foo', + ... 'date': date, + ... 'status': 'ongoing', + ... 'snapshot': None + ... }) + Traceback (most recent call last): + ... + ValueError: Old origin visit format detected... + + """ # noqa + visit = visit.copy() + if 'type' not in visit: + if isinstance(visit['origin'], dict) and 'type' in visit['origin']: + # Very old version of the schema: visits did not have a type, + # but their 'origin' field was a dict with a 'type' key. + visit['type'] = visit['origin']['type'] + else: + # Very old schema version: 'type' is missing, stop early + + # We expect the journal's origin_visit topic to no longer reference + # such visits. If it does, the replayer must crash so we can fix + # the journal's topic. + raise ValueError(f'Old origin visit format detected: {visit}') + if isinstance(visit['origin'], dict): + # Old version of the schema: visit['origin'] was a dict. + visit['origin'] = visit['origin']['url'] + if 'metadata' not in visit: + visit['metadata'] = None + return visit + + +def fix_objects(object_type: str, objects: List[Dict]) -> List[Dict]: + """ + Fix legacy objects from the journal to bring them up to date with the + latest storage schema. + """ + if object_type == 'content': + return [_fix_content(v) for v in objects] + elif object_type == 'revision': + revisions = [_fix_revision(v) for v in objects] + return [rev for rev in revisions if rev is not None] + elif object_type == 'origin_visit': + return [_fix_origin_visit(v) for v in objects] + else: + return objects diff --git a/swh/journal/replay.py b/swh/journal/replay.py index 83a3227..3814e20 100644 --- a/swh/journal/replay.py +++ b/swh/journal/replay.py @@ -1,586 +1,393 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import copy import logging from time import time from typing import ( Any, Callable, Dict, Iterable, List, Optional ) from sentry_sdk import capture_exception, push_scope try: from systemd.daemon import notify except ImportError: notify = None from tenacity import ( retry, retry_if_exception_type, stop_after_attempt, wait_random_exponential, ) from swh.core.statsd import statsd -from swh.model.identifiers import normalize_timestamp +from swh.journal.fixer import fix_objects from swh.model.hashutil import hash_to_hex from swh.model.model import ( BaseContent, BaseModel, Content, Directory, Origin, OriginVisit, Revision, SHA1_SIZE, SkippedContent, Snapshot, Release ) from swh.objstorage.objstorage import ( ID_HASH_ALGO, ObjNotFoundError, ObjStorage, ) -from swh.storage import HashCollision +from swh.storage.exc import HashCollision logger = logging.getLogger(__name__) GRAPH_OPERATIONS_METRIC = "swh_graph_replayer_operations_total" GRAPH_DURATION_METRIC = "swh_graph_replayer_duration_seconds" CONTENT_OPERATIONS_METRIC = "swh_content_replayer_operations_total" CONTENT_RETRY_METRIC = "swh_content_replayer_retries_total" CONTENT_BYTES_METRIC = "swh_content_replayer_bytes" CONTENT_DURATION_METRIC = "swh_content_replayer_duration_seconds" object_converter_fn: Dict[str, Callable[[Dict], BaseModel]] = { 'origin': Origin.from_dict, 'origin_visit': OriginVisit.from_dict, 'snapshot': Snapshot.from_dict, 'revision': Revision.from_dict, 'release': Release.from_dict, 'directory': Directory.from_dict, 'content': Content.from_dict, 'skipped_content': SkippedContent.from_dict, } def process_replay_objects(all_objects, *, storage): for (object_type, objects) in all_objects.items(): logger.debug("Inserting %s %s objects", len(objects), object_type) with statsd.timed(GRAPH_DURATION_METRIC, tags={'object_type': object_type}): _insert_objects(object_type, objects, storage) statsd.increment(GRAPH_OPERATIONS_METRIC, len(objects), tags={'object_type': object_type}) if notify: notify('WATCHDOG=1') -def _fix_revision_pypi_empty_string(rev): - """PyPI loader failed to encode empty strings as bytes, see: - swh:1:rev:8f0095ee0664867055d03de9bcc8f95b91d8a2b9 - or https://forge.softwareheritage.org/D1772 - """ - rev = { - **rev, - 'author': rev['author'].copy(), - 'committer': rev['committer'].copy(), - } - if rev['author'].get('email') == '': - rev['author']['email'] = b'' - if rev['author'].get('name') == '': - rev['author']['name'] = b'' - if rev['committer'].get('email') == '': - rev['committer']['email'] = b'' - if rev['committer'].get('name') == '': - rev['committer']['name'] = b'' - return rev - - -def _fix_revision_transplant_source(rev): - if rev.get('metadata') and rev['metadata'].get('extra_headers'): - rev = copy.deepcopy(rev) - rev['metadata']['extra_headers'] = [ - [key, value.encode('ascii')] - if key == 'transplant_source' and isinstance(value, str) - else [key, value] - for (key, value) in rev['metadata']['extra_headers']] - return rev - - -def _check_date(date): - """Returns whether the date can be represented in backends with sane - limits on timestamps and timezones (resp. signed 64-bits and - signed 16 bits), and that microseconds is valid (ie. between 0 and 10^6). - """ - if date is None: - return True - date = normalize_timestamp(date) - return (-2**63 <= date['timestamp']['seconds'] < 2**63) \ - and (0 <= date['timestamp']['microseconds'] < 10**6) \ - and (-2**15 <= date['offset'] < 2**15) - - -def _check_revision_date(rev): - """Exclude revisions with invalid dates. - See https://forge.softwareheritage.org/T1339""" - return _check_date(rev['date']) and _check_date(rev['committer_date']) - - -def _fix_revisions(revisions: List[Dict]) -> List[Dict]: - """Adapt revisions into a list of (current) storage compatible dicts. - - >>> from pprint import pprint - >>> date = { - ... 'timestamp': { - ... 'seconds': 1565096932, - ... 'microseconds': 0, - ... }, - ... 'offset': 0, - ... } - >>> pprint(_fix_revisions([{ - ... 'author': {'email': '', 'fullname': b'', 'name': ''}, - ... 'committer': {'email': '', 'fullname': b'', 'name': ''}, - ... 'date': date, - ... 'committer_date': date, - ... }])) - [{'author': {'email': b'', 'fullname': b'', 'name': b''}, - 'committer': {'email': b'', 'fullname': b'', 'name': b''}, - 'committer_date': {'offset': 0, - 'timestamp': {'microseconds': 0, 'seconds': 1565096932}}, - 'date': {'offset': 0, - 'timestamp': {'microseconds': 0, 'seconds': 1565096932}}}] - - Fix type of 'transplant_source' extra headers: - - >>> revs = _fix_revisions([{ - ... 'author': {'email': '', 'fullname': b'', 'name': ''}, - ... 'committer': {'email': '', 'fullname': b'', 'name': ''}, - ... 'date': date, - ... 'committer_date': date, - ... 'metadata': { - ... 'extra_headers': [ - ... ['time_offset_seconds', b'-3600'], - ... ['transplant_source', '29c154a012a70f49df983625090434587622b39e'] # noqa - ... ]} - ... }]) - >>> pprint(revs[0]['metadata']['extra_headers']) - [['time_offset_seconds', b'-3600'], - ['transplant_source', b'29c154a012a70f49df983625090434587622b39e']] - - Filter out revisions with invalid dates: - - >>> from copy import deepcopy - >>> invalid_date1 = deepcopy(date) - >>> invalid_date1['timestamp']['microseconds'] = 1000000000 # > 10^6 - >>> _fix_revisions([{ - ... 'author': {'email': '', 'fullname': b'', 'name': b''}, - ... 'committer': {'email': '', 'fullname': b'', 'name': b''}, - ... 'date': invalid_date1, - ... 'committer_date': date, - ... }]) - [] - - >>> invalid_date2 = deepcopy(date) - >>> invalid_date2['timestamp']['seconds'] = 2**70 # > 10^63 - >>> _fix_revisions([{ - ... 'author': {'email': '', 'fullname': b'', 'name': b''}, - ... 'committer': {'email': '', 'fullname': b'', 'name': b''}, - ... 'date': invalid_date2, - ... 'committer_date': date, - ... }]) - [] - - >>> invalid_date3 = deepcopy(date) - >>> invalid_date3['offset'] = 2**20 # > 10^15 - >>> _fix_revisions([{ - ... 'author': {'email': '', 'fullname': b'', 'name': b''}, - ... 'committer': {'email': '', 'fullname': b'', 'name': b''}, - ... 'date': date, - ... 'committer_date': invalid_date3, - ... }]) - [] - - """ - good_revisions: List = [] - for rev in revisions: - rev = _fix_revision_pypi_empty_string(rev) - rev = _fix_revision_transplant_source(rev) - if not _check_revision_date(rev): - logging.warning('Excluding revision (invalid date): %r', rev) - continue - if rev not in good_revisions: - good_revisions.append(rev) - return good_revisions - - -def _fix_origin_visit(visit: Dict) -> OriginVisit: - """Adapt origin visits into a list of current storage compatible - OriginVisits. - - `visit['origin']` is a dict instead of an URL: - - >>> from datetime import datetime, timezone - >>> from pprint import pprint - >>> date = datetime(2020, 2, 27, 14, 39, 19, tzinfo=timezone.utc) - >>> pprint(_fix_origin_visit({ - ... 'origin': {'url': 'http://foo'}, - ... 'date': date, - ... 'type': 'git', - ... 'status': 'ongoing', - ... 'snapshot': None, - ... }).to_dict()) - {'date': datetime.datetime(2020, 2, 27, 14, 39, 19, tzinfo=datetime.timezone.utc), - 'metadata': None, - 'origin': 'http://foo', - 'snapshot': None, - 'status': 'ongoing', - 'type': 'git'} - - `visit['type']` is missing , but `origin['visit']['type']` exists: - - >>> pprint(_fix_origin_visit( - ... {'origin': {'type': 'hg', 'url': 'http://foo'}, - ... 'date': date, - ... 'status': 'ongoing', - ... 'snapshot': None, - ... }).to_dict()) - {'date': datetime.datetime(2020, 2, 27, 14, 39, 19, tzinfo=datetime.timezone.utc), - 'metadata': None, - 'origin': 'http://foo', - 'snapshot': None, - 'status': 'ongoing', - 'type': 'hg'} - - """ # noqa - visit = visit.copy() - if 'type' not in visit: - if isinstance(visit['origin'], dict) and 'type' in visit['origin']: - # Very old version of the schema: visits did not have a type, - # but their 'origin' field was a dict with a 'type' key. - visit['type'] = visit['origin']['type'] - else: - # Very very old version of the schema: 'type' is missing, - # so there is nothing we can do to fix it. - raise ValueError('Got an origin_visit too old to be replayed.') - if isinstance(visit['origin'], dict): - # Old version of the schema: visit['origin'] was a dict. - visit['origin'] = visit['origin']['url'] - if 'metadata' not in visit: - visit['metadata'] = None - return OriginVisit.from_dict(visit) - - def collision_aware_content_add( content_add_fn: Callable[[Iterable[Any]], None], contents: List[BaseContent]) -> None: """Add contents to storage. If a hash collision is detected, an error is logged. Then this adds the other non colliding contents to the storage. Args: content_add_fn: Storage content callable contents: List of contents or skipped contents to add to storage """ if not contents: return colliding_content_hashes: List[Dict[str, Any]] = [] while True: try: content_add_fn(contents) except HashCollision as e: - algo, hash_id, colliding_hashes = e.args - hash_id = hash_to_hex(hash_id) colliding_content_hashes.append({ - 'algo': algo, - 'hash': hash_to_hex(hash_id), - 'objects': [{k: hash_to_hex(v) for k, v in collision.items()} - for collision in colliding_hashes] + 'algo': e.algo, + 'hash': e.hash_id, # hex hash id + 'objects': e.colliding_contents # hex hashes }) + colliding_hashes = e.colliding_content_hashes() # Drop the colliding contents from the transaction contents = [c for c in contents if c.hashes() not in colliding_hashes] else: # Successfully added contents, we are done break if colliding_content_hashes: for collision in colliding_content_hashes: logger.error('Collision detected: %(collision)s', { 'collision': collision }) def _insert_objects(object_type: str, objects: List[Dict], storage) -> None: """Insert objects of type object_type in the storage. """ + objects = fix_objects(object_type, objects) + if object_type == 'content': contents: List[BaseContent] = [] skipped_contents: List[BaseContent] = [] for content in objects: c = BaseContent.from_dict(content) if isinstance(c, SkippedContent): skipped_contents.append(c) else: contents.append(c) collision_aware_content_add( storage.skipped_content_add, skipped_contents) collision_aware_content_add( storage.content_add_metadata, contents) - elif object_type == 'revision': - storage.revision_add( - Revision.from_dict(r) for r in _fix_revisions(objects) - ) elif object_type == 'origin_visit': - visits = [_fix_origin_visit(v) for v in objects] - storage.origin_add(Origin(url=v.origin) for v in visits) + visits: List[OriginVisit] = [] + origins: List[Origin] = [] + for obj in objects: + visit = OriginVisit.from_dict(obj) + visits.append(visit) + origins.append(Origin(url=visit.origin)) + storage.origin_add(origins) storage.origin_visit_upsert(visits) - elif object_type in ('directory', 'release', 'snapshot', 'origin'): + elif object_type in ( + 'directory', 'revision', 'release', 'snapshot', 'origin' + ): method = getattr(storage, object_type + '_add') method(object_converter_fn[object_type](o) for o in objects) else: logger.warning('Received a series of %s, this should not happen', object_type) def is_hash_in_bytearray(hash_, array, nb_hashes, hash_size=SHA1_SIZE): """ Checks if the given hash is in the provided `array`. The array must be a *sorted* list of sha1 hashes, and contain `nb_hashes` hashes (so its size must by `nb_hashes*hash_size` bytes). Args: hash_ (bytes): the hash to look for array (bytes): a sorted concatenated array of hashes (may be of any type supporting slice indexing, eg. :class:`mmap.mmap`) nb_hashes (int): number of hashes in the array hash_size (int): size of a hash (defaults to 20, for SHA1) Example: >>> import os >>> hash1 = os.urandom(20) >>> hash2 = os.urandom(20) >>> hash3 = os.urandom(20) >>> array = b''.join(sorted([hash1, hash2])) >>> is_hash_in_bytearray(hash1, array, 2) True >>> is_hash_in_bytearray(hash2, array, 2) True >>> is_hash_in_bytearray(hash3, array, 2) False """ if len(hash_) != hash_size: raise ValueError('hash_ does not match the provided hash_size.') def get_hash(position): return array[position*hash_size:(position+1)*hash_size] # Regular dichotomy: left = 0 right = nb_hashes while left < right-1: middle = int((right+left)/2) pivot = get_hash(middle) if pivot == hash_: return True elif pivot < hash_: left = middle else: right = middle return get_hash(left) == hash_ class ReplayError(Exception): """An error occurred during the replay of an object""" def __init__(self, operation, *, obj_id, exc): self.operation = operation self.obj_id = hash_to_hex(obj_id) self.exc = exc def __str__(self): return "ReplayError(doing %s, %s, %s)" % ( self.operation, self.obj_id, self.exc ) def log_replay_retry(retry_obj, sleep, last_result): """Log a retry of the content replayer""" exc = last_result.exception() logger.debug('Retry operation %(operation)s on %(obj_id)s: %(exc)s', {'operation': exc.operation, 'obj_id': exc.obj_id, 'exc': str(exc.exc)}) statsd.increment(CONTENT_RETRY_METRIC, tags={ 'operation': exc.operation, 'attempt': str(retry_obj.statistics['attempt_number']), }) def log_replay_error(last_attempt): """Log a replay error to sentry""" exc = last_attempt.exception() with push_scope() as scope: scope.set_tag('operation', exc.operation) scope.set_extra('obj_id', exc.obj_id) capture_exception(exc.exc) logger.error( 'Failed operation %(operation)s on %(obj_id)s after %(retries)s' ' retries: %(exc)s', { 'obj_id': exc.obj_id, 'operation': exc.operation, 'exc': str(exc.exc), 'retries': last_attempt.attempt_number, }) return None CONTENT_REPLAY_RETRIES = 3 content_replay_retry = retry( retry=retry_if_exception_type(ReplayError), stop=stop_after_attempt(CONTENT_REPLAY_RETRIES), wait=wait_random_exponential(multiplier=1, max=60), before_sleep=log_replay_retry, retry_error_callback=log_replay_error, ) @content_replay_retry def copy_object(obj_id, src, dst): hex_obj_id = hash_to_hex(obj_id) obj = '' try: with statsd.timed(CONTENT_DURATION_METRIC, tags={'request': 'get'}): obj = src.get(obj_id) logger.debug('retrieved %(obj_id)s', {'obj_id': hex_obj_id}) with statsd.timed(CONTENT_DURATION_METRIC, tags={'request': 'put'}): dst.add(obj, obj_id=obj_id, check_presence=False) logger.debug('copied %(obj_id)s', {'obj_id': hex_obj_id}) statsd.increment(CONTENT_BYTES_METRIC, len(obj)) except ObjNotFoundError: logger.error('Failed to copy %(obj_id)s: object not found', {'obj_id': hex_obj_id}) raise except Exception as exc: raise ReplayError('copy', obj_id=obj_id, exc=exc) from None return len(obj) @content_replay_retry def obj_in_objstorage(obj_id, dst): """Check if an object is already in an objstorage, tenaciously""" try: return obj_id in dst except Exception as exc: raise ReplayError('in_dst', obj_id=obj_id, exc=exc) from None def process_replay_objects_content( all_objects: Dict[str, List[dict]], *, src: ObjStorage, dst: ObjStorage, exclude_fn: Optional[Callable[[dict], bool]] = None, check_dst: bool = True, ): """ Takes a list of records from Kafka (see :py:func:`swh.journal.client.JournalClient.process`) and copies them from the `src` objstorage to the `dst` objstorage, if: * `obj['status']` is `'visible'` * `exclude_fn(obj)` is `False` (if `exclude_fn` is provided) * `obj['sha1'] not in dst` (if `check_dst` is True) Args: all_objects: Objects passed by the Kafka client. Most importantly, `all_objects['content'][*]['sha1']` is the sha1 hash of each content. src: An object storage (see :py:func:`swh.objstorage.get_objstorage`) dst: An object storage (see :py:func:`swh.objstorage.get_objstorage`) exclude_fn: Determines whether an object should be copied. check_dst: Determines whether we should check the destination objstorage before copying. Example: >>> from swh.objstorage import get_objstorage >>> src = get_objstorage('memory', {}) >>> dst = get_objstorage('memory', {}) >>> id1 = src.add(b'foo bar') >>> id2 = src.add(b'baz qux') >>> kafka_partitions = { ... 'content': [ ... { ... 'sha1': id1, ... 'status': 'visible', ... }, ... { ... 'sha1': id2, ... 'status': 'visible', ... }, ... ] ... } >>> process_replay_objects_content( ... kafka_partitions, src=src, dst=dst, ... exclude_fn=lambda obj: obj['sha1'] == id1) >>> id1 in dst False >>> id2 in dst True """ vol = [] nb_skipped = 0 nb_failures = 0 t0 = time() for (object_type, objects) in all_objects.items(): if object_type != 'content': logger.warning( 'Received a series of %s, this should not happen', object_type) continue for obj in objects: obj_id = obj[ID_HASH_ALGO] if obj['status'] != 'visible': nb_skipped += 1 logger.debug('skipped %s (status=%s)', hash_to_hex(obj_id), obj['status']) statsd.increment(CONTENT_OPERATIONS_METRIC, tags={"decision": "skipped", "status": obj["status"]}) elif exclude_fn and exclude_fn(obj): nb_skipped += 1 logger.debug('skipped %s (manually excluded)', hash_to_hex(obj_id)) statsd.increment(CONTENT_OPERATIONS_METRIC, tags={"decision": "excluded"}) elif check_dst and obj_in_objstorage(obj_id, dst): nb_skipped += 1 logger.debug('skipped %s (in dst)', hash_to_hex(obj_id)) statsd.increment(CONTENT_OPERATIONS_METRIC, tags={"decision": "in_dst"}) else: try: copied = copy_object(obj_id, src, dst) except ObjNotFoundError: nb_skipped += 1 statsd.increment(CONTENT_OPERATIONS_METRIC, tags={"decision": "not_in_src"}) else: if copied is None: nb_failures += 1 statsd.increment(CONTENT_OPERATIONS_METRIC, tags={"decision": "failed"}) else: vol.append(copied) statsd.increment(CONTENT_OPERATIONS_METRIC, tags={"decision": "copied"}) dt = time() - t0 logger.info( 'processed %s content objects in %.1fsec ' '(%.1f obj/sec, %.1fMB/sec) - %d failed - %d skipped', len(vol), dt, len(vol)/dt, sum(vol)/1024/1024/dt, nb_failures, nb_skipped) if notify: notify('WATCHDOG=1') diff --git a/swh/journal/tests/test_replay.py b/swh/journal/tests/test_replay.py index 3865da0..6157617 100644 --- a/swh/journal/tests/test_replay.py +++ b/swh/journal/tests/test_replay.py @@ -1,400 +1,417 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import functools import logging import random from subprocess import Popen -from typing import Dict, Tuple +from typing import Dict, List, Tuple import dateutil +import pytest + from confluent_kafka import Producer from hypothesis import strategies, given, settings -import pytest from swh.storage import get_storage from swh.journal.client import JournalClient from swh.journal.serializers import key_to_kafka, value_to_kafka from swh.journal.replay import process_replay_objects, is_hash_in_bytearray from swh.model.hashutil import hash_to_hex from swh.model.model import Content from .conftest import OBJECT_TYPE_KEYS, DUPLICATE_CONTENTS from .utils import MockedJournalClient, MockedKafkaWriter storage_config = { 'cls': 'pipeline', 'steps': [ {'cls': 'memory'}, ] } def make_topic(kafka_prefix: str, object_type: str) -> str: return kafka_prefix + '.' + object_type def test_storage_play( kafka_prefix: str, kafka_consumer_group: str, kafka_server: Tuple[Popen, int], caplog): """Optimal replayer scenario. This: - writes objects to the topic - replayer consumes objects from the topic and replay them """ (_, port) = kafka_server kafka_prefix += '.swh.journal.objects' storage = get_storage(**storage_config) producer = Producer({ 'bootstrap.servers': 'localhost:{}'.format(port), 'client.id': 'test producer', 'acks': 'all', }) now = datetime.datetime.now(tz=datetime.timezone.utc) # Fill Kafka nb_sent = 0 nb_visits = 0 for (object_type, (_, objects)) in OBJECT_TYPE_KEYS.items(): topic = make_topic(kafka_prefix, object_type) for object_ in objects: key = bytes(random.randint(0, 255) for _ in range(40)) object_ = object_.copy() if object_type == 'content': object_['ctime'] = now elif object_type == 'origin_visit': nb_visits += 1 object_['visit'] = nb_visits producer.produce( topic=topic, key=key_to_kafka(key), value=value_to_kafka(object_), ) nb_sent += 1 producer.flush() caplog.set_level(logging.ERROR, 'swh.journal.replay') # Fill the storage from Kafka replayer = JournalClient( brokers='localhost:%d' % kafka_server[1], group_id=kafka_consumer_group, prefix=kafka_prefix, stop_after_objects=nb_sent, ) worker_fn = functools.partial(process_replay_objects, storage=storage) nb_inserted = 0 while nb_inserted < nb_sent: nb_inserted += replayer.process(worker_fn) assert nb_sent == nb_inserted # Check the objects were actually inserted in the storage assert OBJECT_TYPE_KEYS['revision'][1] == \ list(storage.revision_get( [rev['id'] for rev in OBJECT_TYPE_KEYS['revision'][1]])) assert OBJECT_TYPE_KEYS['release'][1] == \ list(storage.release_get( [rel['id'] for rel in OBJECT_TYPE_KEYS['release'][1]])) origins = list(storage.origin_get( [orig for orig in OBJECT_TYPE_KEYS['origin'][1]])) assert OBJECT_TYPE_KEYS['origin'][1] == \ [{'url': orig['url']} for orig in origins] for origin in origins: origin_url = origin['url'] expected_visits = [ { **visit, 'origin': origin_url, 'date': dateutil.parser.parse(visit['date']), } for visit in OBJECT_TYPE_KEYS['origin_visit'][1] if visit['origin'] == origin['url'] ] actual_visits = list(storage.origin_visit_get( origin_url)) for visit in actual_visits: del visit['visit'] # opaque identifier assert expected_visits == actual_visits input_contents = OBJECT_TYPE_KEYS['content'][1] contents = storage.content_get_metadata( [cont['sha1'] for cont in input_contents]) assert len(contents) == len(input_contents) assert contents == {cont['sha1']: [cont] for cont in input_contents} collision = 0 for record in caplog.records: logtext = record.getMessage() if 'Colliding contents:' in logtext: collision += 1 assert collision == 0, "No collision should be detected" def test_storage_play_with_collision( kafka_prefix: str, kafka_consumer_group: str, kafka_server: Tuple[Popen, int], caplog): """Another replayer scenario with collisions. This: - writes objects to the topic, including colliding contents - replayer consumes objects from the topic and replay them - This drops the colliding contents from the replay when detected """ (_, port) = kafka_server kafka_prefix += '.swh.journal.objects' storage = get_storage(**storage_config) producer = Producer({ 'bootstrap.servers': 'localhost:{}'.format(port), 'client.id': 'test producer', 'enable.idempotence': 'true', }) now = datetime.datetime.now(tz=datetime.timezone.utc) # Fill Kafka nb_sent = 0 nb_visits = 0 for (object_type, (_, objects)) in OBJECT_TYPE_KEYS.items(): topic = make_topic(kafka_prefix, object_type) for object_ in objects: key = bytes(random.randint(0, 255) for _ in range(40)) object_ = object_.copy() if object_type == 'content': object_['ctime'] = now elif object_type == 'origin_visit': nb_visits += 1 object_['visit'] = nb_visits producer.produce( topic=topic, key=key_to_kafka(key), value=value_to_kafka(object_), ) nb_sent += 1 # Create collision in input data # They are not written in the destination for content in DUPLICATE_CONTENTS: topic = make_topic(kafka_prefix, 'content') producer.produce( topic=topic, key=key_to_kafka(key), value=value_to_kafka(content), ) nb_sent += 1 producer.flush() caplog.set_level(logging.ERROR, 'swh.journal.replay') # Fill the storage from Kafka replayer = JournalClient( brokers='localhost:%d' % kafka_server[1], group_id=kafka_consumer_group, prefix=kafka_prefix, stop_after_objects=nb_sent, ) worker_fn = functools.partial(process_replay_objects, storage=storage) nb_inserted = 0 while nb_inserted < nb_sent: nb_inserted += replayer.process(worker_fn) assert nb_sent == nb_inserted # Check the objects were actually inserted in the storage assert OBJECT_TYPE_KEYS['revision'][1] == \ list(storage.revision_get( [rev['id'] for rev in OBJECT_TYPE_KEYS['revision'][1]])) assert OBJECT_TYPE_KEYS['release'][1] == \ list(storage.release_get( [rel['id'] for rel in OBJECT_TYPE_KEYS['release'][1]])) origins = list(storage.origin_get( [orig for orig in OBJECT_TYPE_KEYS['origin'][1]])) assert OBJECT_TYPE_KEYS['origin'][1] == \ [{'url': orig['url']} for orig in origins] for origin in origins: origin_url = origin['url'] expected_visits = [ { **visit, 'origin': origin_url, 'date': dateutil.parser.parse(visit['date']), } for visit in OBJECT_TYPE_KEYS['origin_visit'][1] if visit['origin'] == origin['url'] ] actual_visits = list(storage.origin_visit_get( origin_url)) for visit in actual_visits: del visit['visit'] # opaque identifier assert expected_visits == actual_visits input_contents = OBJECT_TYPE_KEYS['content'][1] contents = storage.content_get_metadata( [cont['sha1'] for cont in input_contents]) assert len(contents) == len(input_contents) assert contents == {cont['sha1']: [cont] for cont in input_contents} nb_collisions = 0 actual_collision: Dict for record in caplog.records: logtext = record.getMessage() if 'Collision detected:' in logtext: nb_collisions += 1 actual_collision = record.args['collision'] assert nb_collisions == 1, "1 collision should be detected" algo = 'sha1' assert actual_collision['algo'] == algo expected_colliding_hash = hash_to_hex(DUPLICATE_CONTENTS[0][algo]) assert actual_collision['hash'] == expected_colliding_hash actual_colliding_hashes = actual_collision['objects'] assert len(actual_colliding_hashes) == len(DUPLICATE_CONTENTS) for content in DUPLICATE_CONTENTS: expected_content_hashes = { k: hash_to_hex(v) for k, v in Content.from_dict(content).hashes().items() } assert expected_content_hashes in actual_colliding_hashes -def _test_write_replay_origin_visit(visits): +def _test_write_replay_origin_visit(visits: List[Dict]): """Helper function to write tests for origin_visit. Each visit (a dict) given in the 'visits' argument will be sent to a (mocked) kafka queue, which a in-memory-storage backed replayer is listening to. Check that corresponding origin visits entities are present in the storage - and have correct values. + and have correct values if they are not skipped. """ - queue = [] + queue: List = [] replayer = MockedJournalClient(queue) writer = MockedKafkaWriter(queue) # Note that flipping the order of these two insertions will crash # the test, because the legacy origin_format does not allow to create # the origin when needed (type is missing) writer.send('origin', 'foo', { 'url': 'http://example.com/', 'type': 'git', }) for visit in visits: writer.send('origin_visit', 'foo', visit) queue_size = len(queue) assert replayer.stop_after_objects is None replayer.stop_after_objects = queue_size storage = get_storage(**storage_config) worker_fn = functools.partial(process_replay_objects, storage=storage) replayer.process(worker_fn) actual_visits = list(storage.origin_visit_get('http://example.com/')) assert len(actual_visits) == len(visits), actual_visits for vin, vout in zip(visits, actual_visits): vin = vin.copy() vout = vout.copy() assert vout.pop('origin') == 'http://example.com/' vin.pop('origin') vin.setdefault('type', 'git') vin.setdefault('metadata', None) assert vin == vout def test_write_replay_origin_visit(): """Test origin_visit when the 'origin' is just a string.""" now = datetime.datetime.now() visits = [{ 'visit': 1, 'origin': 'http://example.com/', 'date': now, 'type': 'git', 'status': 'partial', 'snapshot': None, }] _test_write_replay_origin_visit(visits) def test_write_replay_legacy_origin_visit1(): - """Test origin_visit when there is no type.""" + """Origin_visit with no types should make the replayer crash + + We expect the journal's origin_visit topic to no longer reference such + visits. If it does, the replayer must crash so we can fix the journal's + topic. + + """ now = datetime.datetime.now() - visits = [{ + visit = { 'visit': 1, 'origin': 'http://example.com/', 'date': now, 'status': 'partial', 'snapshot': None, - }] - with pytest.raises(ValueError, match='too old'): - _test_write_replay_origin_visit(visits) + } + now2 = datetime.datetime.now() + visit2 = { + 'visit': 2, + 'origin': {'url': 'http://example.com/'}, + 'date': now2, + 'status': 'partial', + 'snapshot': None, + } + + for origin_visit in [visit, visit2]: + with pytest.raises(ValueError, match='Old origin visit format'): + _test_write_replay_origin_visit([origin_visit]) def test_write_replay_legacy_origin_visit2(): """Test origin_visit when 'type' is missing from the visit, but not from the origin.""" now = datetime.datetime.now() visits = [{ 'visit': 1, 'origin': { 'url': 'http://example.com/', 'type': 'git', }, 'date': now, 'type': 'git', 'status': 'partial', 'snapshot': None, }] _test_write_replay_origin_visit(visits) def test_write_replay_legacy_origin_visit3(): """Test origin_visit when the origin is a dict""" now = datetime.datetime.now() visits = [{ 'visit': 1, 'origin': { 'url': 'http://example.com/', }, 'date': now, 'type': 'git', 'status': 'partial', 'snapshot': None, }] _test_write_replay_origin_visit(visits) hash_strategy = strategies.binary(min_size=20, max_size=20) @settings(max_examples=500) @given(strategies.sets(hash_strategy, min_size=0, max_size=500), strategies.sets(hash_strategy, min_size=10)) def test_is_hash_in_bytearray(haystack, needles): array = b''.join(sorted(haystack)) needles |= haystack # Exhaustively test for all objects in the array for needle in needles: assert is_hash_in_bytearray(needle, array, len(haystack)) == \ (needle in haystack) diff --git a/swh/journal/tests/test_write_replay.py b/swh/journal/tests/test_write_replay.py index d096da6..5527038 100644 --- a/swh/journal/tests/test_write_replay.py +++ b/swh/journal/tests/test_write_replay.py @@ -1,161 +1,162 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import functools from unittest.mock import patch import attr from hypothesis import given, settings, HealthCheck from hypothesis.strategies import lists from swh.model.hypothesis_strategies import ( object_dicts, present_contents ) from swh.model.model import Origin -from swh.storage import get_storage, HashCollision +from swh.storage import get_storage +from swh.storage.exc import HashCollision from swh.journal.replay import ( process_replay_objects, process_replay_objects_content, object_converter_fn ) from .utils import MockedJournalClient, MockedKafkaWriter storage_config = { 'cls': 'memory', 'journal_writer': {'cls': 'memory'}, } def empty_person_name_email(rev_or_rel): """Empties the 'name' and 'email' fields of the author/committer fields of a revision or release; leaving only the fullname.""" if getattr(rev_or_rel, 'author', None): rev_or_rel = attr.evolve( rev_or_rel, author=attr.evolve( rev_or_rel.author, name=b'', email=b'', ) ) if getattr(rev_or_rel, 'committer', None): rev_or_rel = attr.evolve( rev_or_rel, committer=attr.evolve( rev_or_rel.committer, name=b'', email=b'', ) ) return rev_or_rel @given(lists(object_dicts(), min_size=1)) @settings(suppress_health_check=[HealthCheck.too_slow]) def test_write_replay_same_order_batches(objects): queue = [] replayer = MockedJournalClient(queue) with patch('swh.journal.writer.inmemory.InMemoryJournalWriter', return_value=MockedKafkaWriter(queue)): storage1 = get_storage(**storage_config) # Write objects to storage1 for (obj_type, obj) in objects: if obj_type == 'content' and obj.get('status') == 'absent': obj_type = 'skipped_content' obj = object_converter_fn[obj_type](obj) if obj_type == 'origin_visit': storage1.origin_add_one(Origin(url=obj.origin)) storage1.origin_visit_upsert([obj]) else: method = getattr(storage1, obj_type + '_add') try: method([obj]) except HashCollision: pass # Bail out early if we didn't insert any relevant objects... queue_size = len(queue) assert queue_size != 0, "No test objects found; hypothesis strategy bug?" assert replayer.stop_after_objects is None replayer.stop_after_objects = queue_size storage2 = get_storage(**storage_config) worker_fn = functools.partial(process_replay_objects, storage=storage2) replayer.process(worker_fn) assert replayer.consumer.committed for attr_name in ('_contents', '_directories', '_snapshots', '_origin_visits', '_origins'): assert getattr(storage1, attr_name) == getattr(storage2, attr_name), \ attr_name # When hypothesis generates a revision and a release with same # author (or committer) fullname but different name or email, then # the storage will use the first name/email it sees. # This first one will be either the one from the revision or the release, # and since there is no order guarantees, storage2 has 1/2 chance of # not seeing the same order as storage1, therefore we need to strip # them out before comparing. for attr_name in ('_revisions', '_releases'): items1 = {k: empty_person_name_email(v) for (k, v) in getattr(storage1, attr_name).items()} items2 = {k: empty_person_name_email(v) for (k, v) in getattr(storage2, attr_name).items()} assert items1 == items2, attr_name # TODO: add test for hash collision @given(lists(present_contents(), min_size=1)) @settings(suppress_health_check=[HealthCheck.too_slow]) def test_write_replay_content(objects): queue = [] replayer = MockedJournalClient(queue) with patch('swh.journal.writer.inmemory.InMemoryJournalWriter', return_value=MockedKafkaWriter(queue)): storage1 = get_storage(**storage_config) contents = [] for obj in objects: storage1.content_add([obj]) contents.append(obj) # Bail out early if we didn't insert any relevant objects... queue_size = len(queue) assert queue_size != 0, "No test objects found; hypothesis strategy bug?" assert replayer.stop_after_objects is None replayer.stop_after_objects = queue_size storage2 = get_storage(**storage_config) objstorage1 = storage1.objstorage.objstorage objstorage2 = storage2.objstorage.objstorage worker_fn = functools.partial(process_replay_objects_content, src=objstorage1, dst=objstorage2) replayer.process(worker_fn) # only content with status visible will be copied in storage2 expected_objstorage_state = { c.sha1: c.data for c in contents if c.status == 'visible' } assert expected_objstorage_state == objstorage2.state diff --git a/version.txt b/version.txt index c514853..1bd474e 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -v0.0.27-0-gd3d5b79 \ No newline at end of file +v0.0.28-0-g7ab4c2c \ No newline at end of file