Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/replay.py
# Copyright (C) 2019 The Software Heritage developers | # Copyright (C) 2019 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from time import time | from time import time | ||||
import logging | import logging | ||||
from concurrent.futures import ThreadPoolExecutor | from concurrent.futures import ThreadPoolExecutor | ||||
from swh.core.statsd import statsd | from swh.core.statsd import statsd | ||||
from swh.model.identifiers import normalize_timestamp | |||||
from swh.model.hashutil import hash_to_hex | from swh.model.hashutil import hash_to_hex | ||||
from swh.model.model import SHA1_SIZE | from swh.model.model import SHA1_SIZE | ||||
from swh.objstorage.objstorage import ID_HASH_ALGO | from swh.objstorage.objstorage import ID_HASH_ALGO | ||||
from swh.storage import HashCollision | from swh.storage import HashCollision | ||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||
def process_replay_objects(all_objects, *, storage): | def process_replay_objects(all_objects, *, storage): | ||||
for (object_type, objects) in all_objects.items(): | for (object_type, objects) in all_objects.items(): | ||||
_insert_objects(object_type, objects, storage) | _insert_objects(object_type, objects, storage) | ||||
def _fix_revisions(revisions): | def _fix_revision_pypi_empty_string(rev): | ||||
for rev in revisions: | """PyPI loader failed to encode empty strings as bytes, see: | ||||
# PyPI loader failed to encode empty strings as bytes, see: | swh:1:rev:8f0095ee0664867055d03de9bcc8f95b91d8a2b9 | ||||
# swh:1:rev:8f0095ee0664867055d03de9bcc8f95b91d8a2b9 | or https://forge.softwareheritage.org/D1772 | ||||
# or https://forge.softwareheritage.org/D1772 | """ | ||||
rev = { | |||||
**rev, | |||||
'author': rev['author'].copy(), | |||||
'committer': rev['committer'].copy(), | |||||
} | |||||
if rev['author'].get('email') == '': | if rev['author'].get('email') == '': | ||||
rev['author']['email'] = b'' | rev['author']['email'] = b'' | ||||
ardumont: Urgh, maybe make that return the new list of revisions instead of mutating the input...
That'd… | |||||
if rev['author'].get('name') == '': | if rev['author'].get('name') == '': | ||||
rev['author']['name'] = b'' | rev['author']['name'] = b'' | ||||
if rev['committer'].get('email') == '': | if rev['committer'].get('email') == '': | ||||
rev['committer']['email'] = b'' | rev['committer']['email'] = b'' | ||||
Done Inline ActionsPlease, add a docstring to explicit the range. ardumont: Please, add a docstring to explicit the range.
People are not all python/postgresql experts so… | |||||
if rev['committer'].get('name') == '': | if rev['committer'].get('name') == '': | ||||
rev['committer']['name'] = b'' | rev['committer']['name'] = b'' | ||||
return rev | |||||
def _check_date(date): | |||||
"""Returns whether the date can be represented in backends with sane | |||||
limits on timestamps and timezeones (resp. signed 64-bits and | |||||
signed 16 bits), and that microseconds is valid (ie. between 0 and 10^6). | |||||
""" | |||||
date = normalize_timestamp(date) | |||||
return (-2**63 <= date['timestamp']['seconds'] < 2**63) \ | |||||
and (0 <= date['timestamp']['microseconds'] < 10**6) \ | |||||
and (-2**15 <= date['offset'] < 2**15) | |||||
def _check_revision_date(rev): | |||||
"""Exclude revisions with invalid dates. | |||||
See https://forge.softwareheritage.org/T1339""" | |||||
return _check_date(rev['date']) and _check_date(rev['committer_date']) | |||||
def _fix_revisions(revisions): | |||||
good_revisions = [] | |||||
for rev in revisions: | |||||
rev = _fix_revision_pypi_empty_string(rev) | |||||
if not _check_revision_date(rev): | |||||
Done Inline ActionsExcluding revision with invalid date fields or anything clearer if you have ;) ardumont: `Excluding revision with invalid date fields` or anything clearer if you have ;) | |||||
logging.warning('Excluding revision (invalid date): %r', rev) | |||||
continue | |||||
good_revisions.append(rev) | |||||
return good_revisions | |||||
def _fix_origin_visits(visits): | def _fix_origin_visits(visits): | ||||
good_visits = [] | |||||
for visit in visits: | for visit in visits: | ||||
visit = visit.copy() | |||||
if isinstance(visit['origin'], str): | if isinstance(visit['origin'], str): | ||||
# note that it will crash with the pg and | # note that it will crash with the pg and | ||||
# in-mem storages if the origin is not already known, | # in-mem storages if the origin is not already known, | ||||
# but there is no other choice because we can't add an | # but there is no other choice because we can't add an | ||||
# origin without knowing its type. Non-pg storages | # origin without knowing its type. Non-pg storages | ||||
# don't use a numeric FK internally, | # don't use a numeric FK internally, | ||||
visit['origin'] = {'url': visit['origin']} | visit['origin'] = {'url': visit['origin']} | ||||
else: | else: | ||||
if 'type' not in visit: | if 'type' not in visit: | ||||
visit['type'] = visit['origin']['type'] | visit['type'] = visit['origin']['type'] | ||||
good_visits.append(visit) | |||||
return good_visits | |||||
def fix_objects(object_type, objects): | def fix_objects(object_type, objects): | ||||
"""Converts a possibly old object from the journal to its current | """Converts a possibly old object from the journal to its current | ||||
expected format. | expected format. | ||||
List of conversions: | List of conversions: | ||||
Empty author name/email in PyPI releases: | Empty author name/email in PyPI releases: | ||||
>>> from pprint import pprint | >>> from pprint import pprint | ||||
>>> date = { | |||||
... 'timestamp': { | |||||
... 'seconds': 1565096932, | |||||
... 'microseconds': 0, | |||||
... }, | |||||
... 'offset': 0, | |||||
... } | |||||
>>> pprint(fix_objects('revision', [{ | >>> pprint(fix_objects('revision', [{ | ||||
... 'author': {'email': '', 'fullname': b'', 'name': ''}, | ... 'author': {'email': '', 'fullname': b'', 'name': ''}, | ||||
... 'committer': {'email': '', 'fullname': b'', 'name': ''}, | ... 'committer': {'email': '', 'fullname': b'', 'name': ''}, | ||||
... 'date': date, | |||||
... 'committer_date': date, | |||||
... }])) | ... }])) | ||||
[{'author': {'email': b'', 'fullname': b'', 'name': b''}, | [{'author': {'email': b'', 'fullname': b'', 'name': b''}, | ||||
'committer': {'email': b'', 'fullname': b'', 'name': b''}}] | 'committer': {'email': b'', 'fullname': b'', 'name': b''}, | ||||
'committer_date': {'offset': 0, | |||||
'timestamp': {'microseconds': 0, 'seconds': 1565096932}}, | |||||
'date': {'offset': 0, | |||||
'timestamp': {'microseconds': 0, 'seconds': 1565096932}}}] | |||||
Filter out revisions with invalid dates: | |||||
>>> from copy import deepcopy | |||||
>>> invalid_date1 = deepcopy(date) | |||||
>>> invalid_date1['timestamp']['microseconds'] = 1000000000 # > 10^6 | |||||
>>> fix_objects('revision', [{ | |||||
... 'author': {'email': '', 'fullname': b'', 'name': b''}, | |||||
... 'committer': {'email': '', 'fullname': b'', 'name': b''}, | |||||
... 'date': invalid_date1, | |||||
... 'committer_date': date, | |||||
... }]) | |||||
[] | |||||
>>> invalid_date2 = deepcopy(date) | |||||
>>> invalid_date2['timestamp']['seconds'] = 2**70 # > 10^63 | |||||
>>> fix_objects('revision', [{ | |||||
... 'author': {'email': '', 'fullname': b'', 'name': b''}, | |||||
... 'committer': {'email': '', 'fullname': b'', 'name': b''}, | |||||
... 'date': invalid_date2, | |||||
... 'committer_date': date, | |||||
... }]) | |||||
[] | |||||
>>> invalid_date3 = deepcopy(date) | |||||
>>> invalid_date3['offset'] = 2**20 # > 10^15 | |||||
>>> fix_objects('revision', [{ | |||||
... 'author': {'email': '', 'fullname': b'', 'name': b''}, | |||||
... 'committer': {'email': '', 'fullname': b'', 'name': b''}, | |||||
... 'date': date, | |||||
... 'committer_date': invalid_date3, | |||||
... }]) | |||||
[] | |||||
`visit['origin']` is an URL instead of a dict: | `visit['origin']` is an URL instead of a dict: | ||||
>>> fix_objects('origin_visit', [{'origin': 'http://foo'}]) | >>> fix_objects('origin_visit', [{'origin': 'http://foo'}]) | ||||
[{'origin': {'url': 'http://foo'}}] | [{'origin': {'url': 'http://foo'}}] | ||||
`visit['type']` is missing , but `origin['visit']['type']` exists: | `visit['type']` is missing , but `origin['visit']['type']` exists: | ||||
>>> pprint(fix_objects( | >>> pprint(fix_objects( | ||||
... 'origin_visit', | ... 'origin_visit', | ||||
... [{'origin': {'type': 'hg', 'url': 'http://foo'}}])) | ... [{'origin': {'type': 'hg', 'url': 'http://foo'}}])) | ||||
[{'origin': {'type': 'hg', 'url': 'http://foo'}, 'type': 'hg'}] | [{'origin': {'type': 'hg', 'url': 'http://foo'}, 'type': 'hg'}] | ||||
""" | """ # noqa | ||||
if object_type == 'revision': | if object_type == 'revision': | ||||
_fix_revisions(objects) | objects = _fix_revisions(objects) | ||||
elif object_type == 'origin_visit': | elif object_type == 'origin_visit': | ||||
_fix_origin_visits(objects) | objects = _fix_origin_visits(objects) | ||||
return objects | return objects | ||||
def _insert_objects(object_type, objects, storage): | def _insert_objects(object_type, objects, storage): | ||||
fix_objects(object_type, objects) | objects = fix_objects(object_type, objects) | ||||
if object_type == 'content': | if object_type == 'content': | ||||
# TODO: insert 'content' in batches | # TODO: insert 'content' in batches | ||||
for object_ in objects: | for object_ in objects: | ||||
try: | try: | ||||
storage.content_add_metadata([object_]) | storage.content_add_metadata([object_]) | ||||
except HashCollision as e: | except HashCollision as e: | ||||
logger.error('Hash collision: %s', e.args) | logger.error('Hash collision: %s', e.args) | ||||
▲ Show 20 Lines • Show All 159 Lines • Show Last 20 Lines |
Urgh, maybe make that return the new list of revisions instead of mutating the input...
That'd be clearer.