diff --git a/swh/journal/fixer.py b/swh/journal/fixer.py new file mode 100644 --- /dev/null +++ b/swh/journal/fixer.py @@ -0,0 +1,268 @@ +import copy +import logging +from typing import Any, Dict, List, Optional +from swh.model.identifiers import normalize_timestamp + +logger = logging.getLogger(__name__) + + +def _fix_content(content: Dict[str, Any]) -> Dict[str, Any]: + """Filters-out invalid 'perms' key that leaked from swh.model.from_disk + to the journal. + + >>> _fix_content({'perms': 0o100644, 'sha1_git': b'foo'}) + {'sha1_git': b'foo'} + + >>> _fix_content({'sha1_git': b'bar'}) + {'sha1_git': b'bar'} + + """ + content = content.copy() + content.pop('perms', None) + return content + + +def _fix_revision_pypi_empty_string(rev): + """PyPI loader failed to encode empty strings as bytes, see: + swh:1:rev:8f0095ee0664867055d03de9bcc8f95b91d8a2b9 + or https://forge.softwareheritage.org/D1772 + """ + rev = { + **rev, + 'author': rev['author'].copy(), + 'committer': rev['committer'].copy(), + } + if rev['author'].get('email') == '': + rev['author']['email'] = b'' + if rev['author'].get('name') == '': + rev['author']['name'] = b'' + if rev['committer'].get('email') == '': + rev['committer']['email'] = b'' + if rev['committer'].get('name') == '': + rev['committer']['name'] = b'' + return rev + + +def _fix_revision_transplant_source(rev): + if rev.get('metadata') and rev['metadata'].get('extra_headers'): + rev = copy.deepcopy(rev) + rev['metadata']['extra_headers'] = [ + [key, value.encode('ascii')] + if key == 'transplant_source' and isinstance(value, str) + else [key, value] + for (key, value) in rev['metadata']['extra_headers']] + return rev + + +def _check_date(date): + """Returns whether the date can be represented in backends with sane + limits on timestamps and timezones (resp. signed 64-bits and + signed 16 bits), and that microseconds is valid (ie. between 0 and 10^6). + """ + if date is None: + return True + date = normalize_timestamp(date) + return (-2**63 <= date['timestamp']['seconds'] < 2**63) \ + and (0 <= date['timestamp']['microseconds'] < 10**6) \ + and (-2**15 <= date['offset'] < 2**15) + + +def _check_revision_date(rev): + """Exclude revisions with invalid dates. + See https://forge.softwareheritage.org/T1339""" + return _check_date(rev['date']) and _check_date(rev['committer_date']) + + +def _fix_revision(revision: Dict[str, Any]) -> Optional[Dict]: + """Fix various legacy revision issues. + + Fix author/committer person: + + >>> from pprint import pprint + >>> date = { + ... 'timestamp': { + ... 'seconds': 1565096932, + ... 'microseconds': 0, + ... }, + ... 'offset': 0, + ... } + >>> rev0 = _fix_revision({ + ... 'id': b'rev-id', + ... 'author': {'fullname': b'', 'name': '', 'email': ''}, + ... 'committer': {'fullname': b'', 'name': '', 'email': ''}, + ... 'date': date, + ... 'committer_date': date, + ... 'type': 'git', + ... 'message': '', + ... 'directory': b'dir-id', + ... 'synthetic': False, + ... }) + >>> rev0['author'] + {'fullname': b'', 'name': b'', 'email': b''} + >>> rev0['committer'] + {'fullname': b'', 'name': b'', 'email': b''} + + Fix type of 'transplant_source' extra headers: + + >>> rev1 = _fix_revision({ + ... 'id': b'rev-id', + ... 'author': {'fullname': b'', 'name': '', 'email': ''}, + ... 'committer': {'fullname': b'', 'name': '', 'email': ''}, + ... 'date': date, + ... 'committer_date': date, + ... 'metadata': { + ... 'extra_headers': [ + ... ['time_offset_seconds', b'-3600'], + ... ['transplant_source', '29c154a012a70f49df983625090434587622b39e'] + ... ]}, + ... 'type': 'git', + ... 'message': '', + ... 'directory': b'dir-id', + ... 'synthetic': False, + ... }) + >>> pprint(rev1['metadata']['extra_headers']) + [['time_offset_seconds', b'-3600'], + ['transplant_source', b'29c154a012a70f49df983625090434587622b39e']] + + Revision with invalid date are filtered: + + >>> from copy import deepcopy + >>> invalid_date1 = deepcopy(date) + >>> invalid_date1['timestamp']['microseconds'] = 1000000000 # > 10^6 + >>> rev = _fix_revision({ + ... 'author': {'fullname': b'', 'name': '', 'email': ''}, + ... 'committer': {'fullname': b'', 'name': '', 'email': ''}, + ... 'date': invalid_date1, + ... 'committer_date': date, + ... }) + >>> rev is None + True + + >>> invalid_date2 = deepcopy(date) + >>> invalid_date2['timestamp']['seconds'] = 2**70 # > 10^63 + >>> rev = _fix_revision({ + ... 'author': {'fullname': b'', 'name': '', 'email': ''}, + ... 'committer': {'fullname': b'', 'name': '', 'email': ''}, + ... 'date': invalid_date2, + ... 'committer_date': date, + ... }) + >>> rev is None + True + + >>> invalid_date3 = deepcopy(date) + >>> invalid_date3['offset'] = 2**20 # > 10^15 + >>> rev = _fix_revision({ + ... 'author': {'fullname': b'', 'name': '', 'email': ''}, + ... 'committer': {'fullname': b'', 'name': '', 'email': ''}, + ... 'date': date, + ... 'committer_date': invalid_date3, + ... }) + >>> rev is None + True + + """ # noqa + rev = _fix_revision_pypi_empty_string(revision) + rev = _fix_revision_transplant_source(rev) + if not _check_revision_date(rev): + logger.warning('Invalid revision date detected: %(revision)s', { + 'revision': rev + }) + return None + return rev + + +def _fix_origin_visit(visit: Dict) -> Dict: + """Fix various legacy origin visit issues. + + `visit['origin']` is a dict instead of an URL: + + >>> from datetime import datetime, timezone + >>> from pprint import pprint + >>> date = datetime(2020, 2, 27, 14, 39, 19, tzinfo=timezone.utc) + >>> pprint(_fix_origin_visit({ + ... 'origin': {'url': 'http://foo'}, + ... 'date': date, + ... 'type': 'git', + ... 'status': 'ongoing', + ... 'snapshot': None, + ... })) + {'date': datetime.datetime(2020, 2, 27, 14, 39, 19, tzinfo=datetime.timezone.utc), + 'metadata': None, + 'origin': 'http://foo', + 'snapshot': None, + 'status': 'ongoing', + 'type': 'git'} + + `visit['type']` is missing , but `origin['visit']['type']` exists: + + >>> pprint(_fix_origin_visit( + ... {'origin': {'type': 'hg', 'url': 'http://foo'}, + ... 'date': date, + ... 'status': 'ongoing', + ... 'snapshot': None, + ... })) + {'date': datetime.datetime(2020, 2, 27, 14, 39, 19, tzinfo=datetime.timezone.utc), + 'metadata': None, + 'origin': 'http://foo', + 'snapshot': None, + 'status': 'ongoing', + 'type': 'hg'} + + Old visit format (origin_visit with no type) raises: + + >>> _fix_origin_visit({ + ... 'origin': {'url': 'http://foo'}, + ... 'date': date, + ... 'status': 'ongoing', + ... 'snapshot': None + ... }) + Traceback (most recent call last): + ... + ValueError: Old origin visit format detected... + + >>> _fix_origin_visit({ + ... 'origin': 'http://foo', + ... 'date': date, + ... 'status': 'ongoing', + ... 'snapshot': None + ... }) + Traceback (most recent call last): + ... + ValueError: Old origin visit format detected... + + """ # noqa + visit = visit.copy() + if 'type' not in visit: + if isinstance(visit['origin'], dict) and 'type' in visit['origin']: + # Very old version of the schema: visits did not have a type, + # but their 'origin' field was a dict with a 'type' key. + visit['type'] = visit['origin']['type'] + else: + # Very old schema version: 'type' is missing, stop early + + # We expect the journal's origin_visit topic to no longer reference + # such visits. If it does, the replayer must crash so we can fix + # the journal's topic. + raise ValueError(f'Old origin visit format detected: {visit}') + if isinstance(visit['origin'], dict): + # Old version of the schema: visit['origin'] was a dict. + visit['origin'] = visit['origin']['url'] + if 'metadata' not in visit: + visit['metadata'] = None + return visit + + +def fix_objects(object_type: str, objects: List[Dict]) -> List[Dict]: + """ + Fix legacy objects from the journal to bring them up to date with the + latest storage schema. + """ + if object_type == 'content': + return [_fix_content(v) for v in objects] + elif object_type == 'revision': + revisions = [_fix_revision(v) for v in objects] + return [rev for rev in revisions if rev is not None] + elif object_type == 'origin_visit': + return [_fix_origin_visit(v) for v in objects] + else: + return objects diff --git a/swh/journal/replay.py b/swh/journal/replay.py --- a/swh/journal/replay.py +++ b/swh/journal/replay.py @@ -3,7 +3,6 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import copy import logging from time import time from typing import ( @@ -22,7 +21,7 @@ ) from swh.core.statsd import statsd -from swh.model.identifiers import normalize_timestamp +from swh.journal.fixer import fix_objects from swh.model.hashutil import hash_to_hex from swh.model.model import ( @@ -68,253 +67,6 @@ notify('WATCHDOG=1') -def _fix_content(content: Dict[str, Any]) -> Dict[str, Any]: - """Filters-out invalid 'perms' key that leaked from swh.model.from_disk - to the journal. - - >>> _fix_content({'perms': 0o100644, 'sha1_git': b'foo'}) - {'sha1_git': b'foo'} - - >>> _fix_content({'sha1_git': b'bar'}) - {'sha1_git': b'bar'} - - """ - content = content.copy() - content.pop('perms', None) - return content - - -def _fix_revision_pypi_empty_string(rev): - """PyPI loader failed to encode empty strings as bytes, see: - swh:1:rev:8f0095ee0664867055d03de9bcc8f95b91d8a2b9 - or https://forge.softwareheritage.org/D1772 - """ - rev = { - **rev, - 'author': rev['author'].copy(), - 'committer': rev['committer'].copy(), - } - if rev['author'].get('email') == '': - rev['author']['email'] = b'' - if rev['author'].get('name') == '': - rev['author']['name'] = b'' - if rev['committer'].get('email') == '': - rev['committer']['email'] = b'' - if rev['committer'].get('name') == '': - rev['committer']['name'] = b'' - return rev - - -def _fix_revision_transplant_source(rev): - if rev.get('metadata') and rev['metadata'].get('extra_headers'): - rev = copy.deepcopy(rev) - rev['metadata']['extra_headers'] = [ - [key, value.encode('ascii')] - if key == 'transplant_source' and isinstance(value, str) - else [key, value] - for (key, value) in rev['metadata']['extra_headers']] - return rev - - -def _check_date(date): - """Returns whether the date can be represented in backends with sane - limits on timestamps and timezones (resp. signed 64-bits and - signed 16 bits), and that microseconds is valid (ie. between 0 and 10^6). - """ - if date is None: - return True - date = normalize_timestamp(date) - return (-2**63 <= date['timestamp']['seconds'] < 2**63) \ - and (0 <= date['timestamp']['microseconds'] < 10**6) \ - and (-2**15 <= date['offset'] < 2**15) - - -def _check_revision_date(rev): - """Exclude revisions with invalid dates. - See https://forge.softwareheritage.org/T1339""" - return _check_date(rev['date']) and _check_date(rev['committer_date']) - - -def _fix_revision(revision: Dict[str, Any]) -> Optional[Revision]: - """Adapt revision into an swh revision model object (current storage - compatible). - - Fix author/committer person: - - >>> from pprint import pprint - >>> date = { - ... 'timestamp': { - ... 'seconds': 1565096932, - ... 'microseconds': 0, - ... }, - ... 'offset': 0, - ... } - >>> rev0 = _fix_revision({ - ... 'id': b'rev-id', - ... 'author': {'email': '', 'fullname': b'', 'name': ''}, - ... 'committer': {'email': '', 'fullname': b'', 'name': ''}, - ... 'date': date, - ... 'committer_date': date, - ... 'type': 'git', - ... 'message': '', - ... 'directory': b'dir-id', - ... 'synthetic': False, - ... }).to_dict() - >>> rev0['author'] - {'fullname': b'', 'name': b'', 'email': b''} - >>> rev0['committer'] - {'fullname': b'', 'name': b'', 'email': b''} - - Fix type of 'transplant_source' extra headers: - - >>> rev1 = _fix_revision({ - ... 'id': b'rev-id', - ... 'author': {'email': '', 'fullname': b'', 'name': ''}, - ... 'committer': {'email': '', 'fullname': b'', 'name': ''}, - ... 'date': date, - ... 'committer_date': date, - ... 'metadata': { - ... 'extra_headers': [ - ... ['time_offset_seconds', b'-3600'], - ... ['transplant_source', '29c154a012a70f49df983625090434587622b39e'] - ... ]}, - ... 'type': 'git', - ... 'message': '', - ... 'directory': b'dir-id', - ... 'synthetic': False, - ... }) - >>> pprint(rev1.metadata['extra_headers']) - [['time_offset_seconds', b'-3600'], - ['transplant_source', b'29c154a012a70f49df983625090434587622b39e']] - - Revision with invalid date are filtered: - - >>> from copy import deepcopy - >>> invalid_date1 = deepcopy(date) - >>> invalid_date1['timestamp']['microseconds'] = 1000000000 # > 10^6 - >>> rev = _fix_revision({ - ... 'author': {'email': '', 'fullname': b'', 'name': b''}, - ... 'committer': {'email': '', 'fullname': b'', 'name': b''}, - ... 'date': invalid_date1, - ... 'committer_date': date, - ... }) - >>> rev is None - True - - >>> invalid_date2 = deepcopy(date) - >>> invalid_date2['timestamp']['seconds'] = 2**70 # > 10^63 - >>> rev = _fix_revision({ - ... 'author': {'email': '', 'fullname': b'', 'name': b''}, - ... 'committer': {'email': '', 'fullname': b'', 'name': b''}, - ... 'date': invalid_date2, - ... 'committer_date': date, - ... }) - >>> rev is None - True - - >>> invalid_date3 = deepcopy(date) - >>> invalid_date3['offset'] = 2**20 # > 10^15 - >>> rev = _fix_revision({ - ... 'author': {'email': '', 'fullname': b'', 'name': b''}, - ... 'committer': {'email': '', 'fullname': b'', 'name': b''}, - ... 'date': date, - ... 'committer_date': invalid_date3, - ... }) - >>> rev is None - True - - """ # noqa - rev = _fix_revision_pypi_empty_string(revision) - rev = _fix_revision_transplant_source(rev) - if not _check_revision_date(rev): - logger.warning('Invalid revision date detected: %(revision)s', { - 'revision': rev - }) - return None - return Revision.from_dict(rev) - - -def _fix_origin_visit(visit: Dict) -> OriginVisit: - """Adapt origin visit into current storage compatible OriginVisit. - - `visit['origin']` is a dict instead of an URL: - - >>> from datetime import datetime, timezone - >>> from pprint import pprint - >>> date = datetime(2020, 2, 27, 14, 39, 19, tzinfo=timezone.utc) - >>> pprint(_fix_origin_visit({ - ... 'origin': {'url': 'http://foo'}, - ... 'date': date, - ... 'type': 'git', - ... 'status': 'ongoing', - ... 'snapshot': None, - ... }).to_dict()) - {'date': datetime.datetime(2020, 2, 27, 14, 39, 19, tzinfo=datetime.timezone.utc), - 'metadata': None, - 'origin': 'http://foo', - 'snapshot': None, - 'status': 'ongoing', - 'type': 'git'} - - `visit['type']` is missing , but `origin['visit']['type']` exists: - - >>> pprint(_fix_origin_visit( - ... {'origin': {'type': 'hg', 'url': 'http://foo'}, - ... 'date': date, - ... 'status': 'ongoing', - ... 'snapshot': None, - ... }).to_dict()) - {'date': datetime.datetime(2020, 2, 27, 14, 39, 19, tzinfo=datetime.timezone.utc), - 'metadata': None, - 'origin': 'http://foo', - 'snapshot': None, - 'status': 'ongoing', - 'type': 'hg'} - - Old visit format (origin_visit with no type) raises: - - >>> _fix_origin_visit({ - ... 'origin': {'url': 'http://foo'}, - ... 'date': date, - ... 'status': 'ongoing', - ... 'snapshot': None - ... }) - Traceback (most recent call last): - ... - ValueError: Old origin visit format detected... - - >>> _fix_origin_visit({ - ... 'origin': 'http://foo', - ... 'date': date, - ... 'status': 'ongoing', - ... 'snapshot': None - ... }) - Traceback (most recent call last): - ... - ValueError: Old origin visit format detected... - - """ # noqa - visit = visit.copy() - if 'type' not in visit: - if isinstance(visit['origin'], dict) and 'type' in visit['origin']: - # Very old version of the schema: visits did not have a type, - # but their 'origin' field was a dict with a 'type' key. - visit['type'] = visit['origin']['type'] - else: - # Very old schema version: 'type' is missing, stop early - - # We expect the journal's origin_visit topic to no longer reference - # such visits. If it does, the replayer must crash so we can fix - # the journal's topic. - raise ValueError(f'Old origin visit format detected: {visit}') - if isinstance(visit['origin'], dict): - # Old version of the schema: visit['origin'] was a dict. - visit['origin'] = visit['origin']['url'] - if 'metadata' not in visit: - visit['metadata'] = None - return OriginVisit.from_dict(visit) - - def collision_aware_content_add( content_add_fn: Callable[[Iterable[Any]], None], contents: List[BaseContent]) -> None: @@ -358,11 +110,13 @@ """Insert objects of type object_type in the storage. """ + objects = fix_objects(object_type, objects) + if object_type == 'content': contents: List[BaseContent] = [] skipped_contents: List[BaseContent] = [] for content in objects: - c = BaseContent.from_dict(_fix_content(content)) + c = BaseContent.from_dict(content) if isinstance(c, SkippedContent): skipped_contents.append(c) else: @@ -372,23 +126,18 @@ storage.skipped_content_add, skipped_contents) collision_aware_content_add( storage.content_add_metadata, contents) - elif object_type == 'revision': - revisions: List[Revision] = [] - for revision in objects: - rev = _fix_revision(revision) - if rev: - revisions.append(rev) - storage.revision_add(revisions) elif object_type == 'origin_visit': visits: List[OriginVisit] = [] origins: List[Origin] = [] for obj in objects: - visit = _fix_origin_visit(obj) + visit = OriginVisit.from_dict(obj) visits.append(visit) origins.append(Origin(url=visit.origin)) storage.origin_add(origins) storage.origin_visit_upsert(visits) - elif object_type in ('directory', 'release', 'snapshot', 'origin'): + elif object_type in ( + 'directory', 'revision', 'release', 'snapshot', 'origin' + ): method = getattr(storage, object_type + '_add') method(object_converter_fn[object_type](o) for o in objects) else: