Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/replay.py
# Copyright (C) 2019-2020 The Software Heritage developers | # Copyright (C) 2019-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import copy | import copy | ||||
import logging | import logging | ||||
from time import time | from time import time | ||||
from typing import Any, Callable, Dict, Iterable, List, Optional | from typing import ( | ||||
Any, Callable, Dict, Iterable, List, Optional | |||||
) | |||||
from sentry_sdk import capture_exception, push_scope | from sentry_sdk import capture_exception, push_scope | ||||
try: | try: | ||||
from systemd.daemon import notify | from systemd.daemon import notify | ||||
except ImportError: | except ImportError: | ||||
notify = None | notify = None | ||||
from tenacity import ( | from tenacity import ( | ||||
retry, retry_if_exception_type, stop_after_attempt, | retry, retry_if_exception_type, stop_after_attempt, | ||||
wait_random_exponential, | wait_random_exponential, | ||||
) | ) | ||||
from swh.core.statsd import statsd | from swh.core.statsd import statsd | ||||
from swh.model.identifiers import normalize_timestamp | from swh.model.identifiers import normalize_timestamp | ||||
from swh.model.hashutil import hash_to_hex | from swh.model.hashutil import hash_to_hex | ||||
from swh.model.model import BaseContent, SkippedContent, SHA1_SIZE | |||||
from swh.model.model import ( | |||||
BaseContent, BaseModel, Content, Directory, Origin, Revision, | |||||
SHA1_SIZE, SkippedContent, Snapshot, Release | |||||
) | |||||
from swh.objstorage.objstorage import ( | from swh.objstorage.objstorage import ( | ||||
ID_HASH_ALGO, ObjNotFoundError, ObjStorage, | ID_HASH_ALGO, ObjNotFoundError, ObjStorage, | ||||
) | ) | ||||
from swh.storage import HashCollision | from swh.storage import HashCollision | ||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||
GRAPH_OPERATIONS_METRIC = "swh_graph_replayer_operations_total" | GRAPH_OPERATIONS_METRIC = "swh_graph_replayer_operations_total" | ||||
GRAPH_DURATION_METRIC = "swh_graph_replayer_duration_seconds" | GRAPH_DURATION_METRIC = "swh_graph_replayer_duration_seconds" | ||||
CONTENT_OPERATIONS_METRIC = "swh_content_replayer_operations_total" | CONTENT_OPERATIONS_METRIC = "swh_content_replayer_operations_total" | ||||
CONTENT_RETRY_METRIC = "swh_content_replayer_retries_total" | CONTENT_RETRY_METRIC = "swh_content_replayer_retries_total" | ||||
CONTENT_BYTES_METRIC = "swh_content_replayer_bytes" | CONTENT_BYTES_METRIC = "swh_content_replayer_bytes" | ||||
CONTENT_DURATION_METRIC = "swh_content_replayer_duration_seconds" | CONTENT_DURATION_METRIC = "swh_content_replayer_duration_seconds" | ||||
object_converter_fn: Dict[str, Callable[[Dict], BaseModel]] = { | |||||
'origin': Origin.from_dict, | |||||
'snapshot': Snapshot.from_dict, | |||||
'revision': Revision.from_dict, | |||||
'release': Release.from_dict, | |||||
'directory': Directory.from_dict, | |||||
'content': Content.from_dict, | |||||
'skipped_content': SkippedContent.from_dict, | |||||
} | |||||
def process_replay_objects(all_objects, *, storage): | def process_replay_objects(all_objects, *, storage): | ||||
for (object_type, objects) in all_objects.items(): | for (object_type, objects) in all_objects.items(): | ||||
logger.debug("Inserting %s %s objects", len(objects), object_type) | logger.debug("Inserting %s %s objects", len(objects), object_type) | ||||
with statsd.timed(GRAPH_DURATION_METRIC, | with statsd.timed(GRAPH_DURATION_METRIC, | ||||
tags={'object_type': object_type}): | tags={'object_type': object_type}): | ||||
_insert_objects(object_type, objects, storage) | _insert_objects(object_type, objects, storage) | ||||
statsd.increment(GRAPH_OPERATIONS_METRIC, len(objects), | statsd.increment(GRAPH_OPERATIONS_METRIC, len(objects), | ||||
tags={'object_type': object_type}) | tags={'object_type': object_type}) | ||||
▲ Show 20 Lines • Show All 46 Lines • ▼ Show 20 Lines | return (-2**63 <= date['timestamp']['seconds'] < 2**63) \ | ||||
and (-2**15 <= date['offset'] < 2**15) | and (-2**15 <= date['offset'] < 2**15) | ||||
def _check_revision_date(rev): | def _check_revision_date(rev): | ||||
"""Exclude revisions with invalid dates. | """Exclude revisions with invalid dates. | ||||
See https://forge.softwareheritage.org/T1339""" | See https://forge.softwareheritage.org/T1339""" | ||||
return _check_date(rev['date']) and _check_date(rev['committer_date']) | return _check_date(rev['date']) and _check_date(rev['committer_date']) | ||||
def _fix_revisions(revisions): | def _fix_revisions(revisions: List[Dict]) -> List[Dict]: | ||||
vlorentz: If you merge D2813 first, you don't need to return `List[Dict]`.
And instead of `Tuple[List… | |||||
Done Inline ActionsI don't want to iterate again on the list... ardumont: I don't want to iterate again on the list...
I want the iteration to take place once and for… | |||||
Not Done Inline ActionsInstead of giving fix_and_convert_objects three completely different behavior according to object types, you should split it into three different functions, one for each of the behaviors. vlorentz: Instead of giving `fix_and_convert_objects` three completely different behavior according to… | |||||
Done Inline ActionsIn that regard, i was tempted to:
and i did. ardumont: In that regard, i was tempted to:
- remove completely this function indirection
- do the match… | |||||
good_revisions = [] | """Adapt revisions into a list of (current) storage compatible dicts. | ||||
for rev in revisions: | |||||
rev = _fix_revision_pypi_empty_string(rev) | |||||
rev = _fix_revision_transplant_source(rev) | |||||
if not _check_revision_date(rev): | |||||
logging.warning('Excluding revision (invalid date): %r', rev) | |||||
continue | |||||
if rev not in good_revisions: | |||||
good_revisions.append(rev) | |||||
return good_revisions | |||||
def _fix_origin_visits(visits): | |||||
good_visits = [] | |||||
for visit in visits: | |||||
visit = visit.copy() | |||||
if 'type' not in visit: | |||||
if isinstance(visit['origin'], dict) and 'type' in visit['origin']: | |||||
# Very old version of the schema: visits did not have a type, | |||||
# but their 'origin' field was a dict with a 'type' key. | |||||
visit['type'] = visit['origin']['type'] | |||||
else: | |||||
# Very very old version of the schema: 'type' is missing, | |||||
# so there is nothing we can do to fix it. | |||||
raise ValueError('Got an origin_visit too old to be replayed.') | |||||
if isinstance(visit['origin'], dict): | |||||
# Old version of the schema: visit['origin'] was a dict. | |||||
visit['origin'] = visit['origin']['url'] | |||||
good_visits.append(visit) | |||||
return good_visits | |||||
def fix_objects(object_type, objects): | |||||
"""Converts a possibly old object from the journal to its current | |||||
expected format. | |||||
List of conversions: | |||||
Empty author name/email in PyPI releases: | |||||
>>> from pprint import pprint | >>> from pprint import pprint | ||||
Done Inline ActionsWorking on this. ardumont: Working on this. | |||||
Done Inline ActionsRelated to D2813 ardumont: Related to D2813 | |||||
>>> date = { | >>> date = { | ||||
... 'timestamp': { | ... 'timestamp': { | ||||
... 'seconds': 1565096932, | ... 'seconds': 1565096932, | ||||
... 'microseconds': 0, | ... 'microseconds': 0, | ||||
... }, | ... }, | ||||
... 'offset': 0, | ... 'offset': 0, | ||||
... } | ... } | ||||
>>> pprint(fix_objects('revision', [{ | >>> pprint(_fix_revisions([{ | ||||
... 'author': {'email': '', 'fullname': b'', 'name': ''}, | ... 'author': {'email': '', 'fullname': b'', 'name': ''}, | ||||
... 'committer': {'email': '', 'fullname': b'', 'name': ''}, | ... 'committer': {'email': '', 'fullname': b'', 'name': ''}, | ||||
... 'date': date, | ... 'date': date, | ||||
... 'committer_date': date, | ... 'committer_date': date, | ||||
... }])) | ... }])) | ||||
[{'author': {'email': b'', 'fullname': b'', 'name': b''}, | [{'author': {'email': b'', 'fullname': b'', 'name': b''}, | ||||
'committer': {'email': b'', 'fullname': b'', 'name': b''}, | 'committer': {'email': b'', 'fullname': b'', 'name': b''}, | ||||
'committer_date': {'offset': 0, | 'committer_date': {'offset': 0, | ||||
'timestamp': {'microseconds': 0, 'seconds': 1565096932}}, | 'timestamp': {'microseconds': 0, 'seconds': 1565096932}}, | ||||
'date': {'offset': 0, | 'date': {'offset': 0, | ||||
'timestamp': {'microseconds': 0, 'seconds': 1565096932}}}] | 'timestamp': {'microseconds': 0, 'seconds': 1565096932}}}] | ||||
Fix type of 'transplant_source' extra headers: | Fix type of 'transplant_source' extra headers: | ||||
>>> revs = fix_objects('revision', [{ | >>> revs = _fix_revisions([{ | ||||
... 'author': {'email': '', 'fullname': b'', 'name': ''}, | ... 'author': {'email': '', 'fullname': b'', 'name': ''}, | ||||
... 'committer': {'email': '', 'fullname': b'', 'name': ''}, | ... 'committer': {'email': '', 'fullname': b'', 'name': ''}, | ||||
... 'date': date, | ... 'date': date, | ||||
... 'committer_date': date, | ... 'committer_date': date, | ||||
... 'metadata': { | ... 'metadata': { | ||||
... 'extra_headers': [ | ... 'extra_headers': [ | ||||
... ['time_offset_seconds', b'-3600'], | ... ['time_offset_seconds', b'-3600'], | ||||
... ['transplant_source', '29c154a012a70f49df983625090434587622b39e'] | ... ['transplant_source', '29c154a012a70f49df983625090434587622b39e'] # noqa | ||||
... ]} | ... ]} | ||||
... }]) | ... }]) | ||||
>>> pprint(revs[0]['metadata']['extra_headers']) | >>> pprint(revs[0]['metadata']['extra_headers']) | ||||
[['time_offset_seconds', b'-3600'], | [['time_offset_seconds', b'-3600'], | ||||
['transplant_source', b'29c154a012a70f49df983625090434587622b39e']] | ['transplant_source', b'29c154a012a70f49df983625090434587622b39e']] | ||||
Filter out revisions with invalid dates: | Filter out revisions with invalid dates: | ||||
>>> from copy import deepcopy | >>> from copy import deepcopy | ||||
>>> invalid_date1 = deepcopy(date) | >>> invalid_date1 = deepcopy(date) | ||||
>>> invalid_date1['timestamp']['microseconds'] = 1000000000 # > 10^6 | >>> invalid_date1['timestamp']['microseconds'] = 1000000000 # > 10^6 | ||||
>>> fix_objects('revision', [{ | >>> _fix_revisions([{ | ||||
... 'author': {'email': '', 'fullname': b'', 'name': b''}, | ... 'author': {'email': '', 'fullname': b'', 'name': b''}, | ||||
... 'committer': {'email': '', 'fullname': b'', 'name': b''}, | ... 'committer': {'email': '', 'fullname': b'', 'name': b''}, | ||||
... 'date': invalid_date1, | ... 'date': invalid_date1, | ||||
... 'committer_date': date, | ... 'committer_date': date, | ||||
... }]) | ... }]) | ||||
[] | [] | ||||
>>> invalid_date2 = deepcopy(date) | >>> invalid_date2 = deepcopy(date) | ||||
>>> invalid_date2['timestamp']['seconds'] = 2**70 # > 10^63 | >>> invalid_date2['timestamp']['seconds'] = 2**70 # > 10^63 | ||||
>>> fix_objects('revision', [{ | >>> _fix_revisions([{ | ||||
... 'author': {'email': '', 'fullname': b'', 'name': b''}, | ... 'author': {'email': '', 'fullname': b'', 'name': b''}, | ||||
... 'committer': {'email': '', 'fullname': b'', 'name': b''}, | ... 'committer': {'email': '', 'fullname': b'', 'name': b''}, | ||||
... 'date': invalid_date2, | ... 'date': invalid_date2, | ||||
... 'committer_date': date, | ... 'committer_date': date, | ||||
... }]) | ... }]) | ||||
[] | [] | ||||
>>> invalid_date3 = deepcopy(date) | >>> invalid_date3 = deepcopy(date) | ||||
>>> invalid_date3['offset'] = 2**20 # > 10^15 | >>> invalid_date3['offset'] = 2**20 # > 10^15 | ||||
>>> fix_objects('revision', [{ | >>> _fix_revisions([{ | ||||
... 'author': {'email': '', 'fullname': b'', 'name': b''}, | ... 'author': {'email': '', 'fullname': b'', 'name': b''}, | ||||
... 'committer': {'email': '', 'fullname': b'', 'name': b''}, | ... 'committer': {'email': '', 'fullname': b'', 'name': b''}, | ||||
... 'date': date, | ... 'date': date, | ||||
... 'committer_date': invalid_date3, | ... 'committer_date': invalid_date3, | ||||
... }]) | ... }]) | ||||
[] | [] | ||||
""" | |||||
good_revisions: List = [] | |||||
for rev in revisions: | |||||
rev = _fix_revision_pypi_empty_string(rev) | |||||
rev = _fix_revision_transplant_source(rev) | |||||
if not _check_revision_date(rev): | |||||
logging.warning('Excluding revision (invalid date): %r', rev) | |||||
continue | |||||
if rev not in good_revisions: | |||||
good_revisions.append(rev) | |||||
return good_revisions | |||||
def _fix_origin_visits(visits: List[Dict]) -> List[Dict]: | |||||
"""Adapt origin visits into a list of current storage compatible dicts. | |||||
`visit['origin']` is a dict instead of an URL: | `visit['origin']` is a dict instead of an URL: | ||||
>>> pprint(fix_objects('origin_visit', [{ | >>> from pprint import pprint | ||||
>>> pprint(_fix_origin_visits([{ | |||||
... 'origin': {'url': 'http://foo'}, | ... 'origin': {'url': 'http://foo'}, | ||||
... 'type': 'git', | ... 'type': 'git', | ||||
... }])) | ... }])) | ||||
[{'origin': 'http://foo', 'type': 'git'}] | [{'metadata': None, 'origin': 'http://foo', 'type': 'git'}] | ||||
`visit['type']` is missing , but `origin['visit']['type']` exists: | `visit['type']` is missing , but `origin['visit']['type']` exists: | ||||
>>> pprint(fix_objects('origin_visit', [ | >>> pprint(_fix_origin_visits([ | ||||
... {'origin': {'type': 'hg', 'url': 'http://foo'} | ... {'origin': {'type': 'hg', 'url': 'http://foo'} | ||||
... }])) | ... }])) | ||||
[{'origin': 'http://foo', 'type': 'hg'}] | [{'metadata': None, 'origin': 'http://foo', 'type': 'hg'}] | ||||
""" # noqa | |||||
if object_type == 'revision': | """ | ||||
objects = _fix_revisions(objects) | good_visits = [] | ||||
elif object_type == 'origin_visit': | for visit in visits: | ||||
objects = _fix_origin_visits(objects) | visit = visit.copy() | ||||
return objects | if 'type' not in visit: | ||||
if isinstance(visit['origin'], dict) and 'type' in visit['origin']: | |||||
# Very old version of the schema: visits did not have a type, | |||||
# but their 'origin' field was a dict with a 'type' key. | |||||
visit['type'] = visit['origin']['type'] | |||||
else: | |||||
# Very very old version of the schema: 'type' is missing, | |||||
# so there is nothing we can do to fix it. | |||||
raise ValueError('Got an origin_visit too old to be replayed.') | |||||
if isinstance(visit['origin'], dict): | |||||
# Old version of the schema: visit['origin'] was a dict. | |||||
visit['origin'] = visit['origin']['url'] | |||||
if 'metadata' not in visit: | |||||
visit['metadata'] = None | |||||
good_visits.append(visit) | |||||
return good_visits | |||||
def collision_aware_content_add( | def collision_aware_content_add( | ||||
content_add_fn: Callable[[Iterable[Any]], None], | content_add_fn: Callable[[Iterable[Any]], None], | ||||
contents: List[BaseContent]) -> None: | contents: List[BaseContent]) -> None: | ||||
"""Add contents to storage. If a hash collision is detected, an error is | """Add contents to storage. If a hash collision is detected, an error is | ||||
logged. Then this adds the other non colliding contents to the storage. | logged. Then this adds the other non colliding contents to the storage. | ||||
Args: | Args: | ||||
content_add_fn: Storage content callable | content_add_fn: Storage content callable | ||||
contents: List of contents or skipped contents to add to storage | contents: List of contents or skipped contents to add to storage | ||||
""" | """ | ||||
if not contents: | if not contents: | ||||
return | return | ||||
colliding_content_hashes: List[Dict[str, Any]] = [] | colliding_content_hashes: List[Dict[str, Any]] = [] | ||||
while True: | while True: | ||||
try: | try: | ||||
content_add_fn(c.to_dict() for c in contents) | content_add_fn(contents) | ||||
except HashCollision as e: | except HashCollision as e: | ||||
algo, hash_id, colliding_hashes = e.args | algo, hash_id, colliding_hashes = e.args | ||||
hash_id = hash_to_hex(hash_id) | hash_id = hash_to_hex(hash_id) | ||||
colliding_content_hashes.append({ | colliding_content_hashes.append({ | ||||
'algo': algo, | 'algo': algo, | ||||
'hash': hash_to_hex(hash_id), | 'hash': hash_to_hex(hash_id), | ||||
'objects': [{k: hash_to_hex(v) for k, v in collision.items()} | 'objects': [{k: hash_to_hex(v) for k, v in collision.items()} | ||||
for collision in colliding_hashes] | for collision in colliding_hashes] | ||||
}) | }) | ||||
# Drop the colliding contents from the transaction | # Drop the colliding contents from the transaction | ||||
contents = [c for c in contents | contents = [c for c in contents | ||||
if c.hashes() not in colliding_hashes] | if c.hashes() not in colliding_hashes] | ||||
else: | else: | ||||
# Successfully added contents, we are done | # Successfully added contents, we are done | ||||
break | break | ||||
if colliding_content_hashes: | if colliding_content_hashes: | ||||
for collision in colliding_content_hashes: | for collision in colliding_content_hashes: | ||||
logger.error('Collision detected: %(collision)s', { | logger.error('Collision detected: %(collision)s', { | ||||
'collision': collision | 'collision': collision | ||||
}) | }) | ||||
def _insert_objects(object_type, objects, storage): | def _insert_objects(object_type: str, objects: List[Dict], storage) -> None: | ||||
objects = fix_objects(object_type, objects) | """Insert objects of type object_type in the storage. | ||||
""" | |||||
if object_type == 'content': | if object_type == 'content': | ||||
contents, skipped_contents = [], [] | contents: List[BaseContent] = [] | ||||
skipped_contents: List[BaseContent] = [] | |||||
for content in objects: | for content in objects: | ||||
c = BaseContent.from_dict(content) | c = BaseContent.from_dict(content) | ||||
if isinstance(c, SkippedContent): | if isinstance(c, SkippedContent): | ||||
skipped_contents.append(c) | skipped_contents.append(c) | ||||
else: | else: | ||||
contents.append(c) | contents.append(c) | ||||
collision_aware_content_add( | collision_aware_content_add( | ||||
storage.skipped_content_add, skipped_contents) | storage.skipped_content_add, skipped_contents) | ||||
collision_aware_content_add( | collision_aware_content_add( | ||||
storage.content_add_metadata, contents) | storage.content_add_metadata, contents) | ||||
elif object_type == 'revision': | |||||
elif object_type in ('directory', 'revision', 'release', | storage.revision_add( | ||||
'snapshot', 'origin'): | Revision.from_dict(r) for r in _fix_revisions(objects) | ||||
# TODO: split batches that are too large for the storage | ) | ||||
# to handle? | |||||
method = getattr(storage, object_type + '_add') | |||||
method(objects) | |||||
elif object_type == 'origin_visit': | elif object_type == 'origin_visit': | ||||
Not Done Inline ActionsI don't understand this comment vlorentz: I don't understand this comment | |||||
Done Inline Actionsupserts accepts dict for now, it's D2813 again. ardumont: upserts accepts dict for now, it's D2813 again. | |||||
for visit in objects: | visits = _fix_origin_visits(objects) | ||||
storage.origin_add_one({'url': visit['origin']}) | storage.origin_add(Origin(url=v['origin']) for v in visits) | ||||
if 'metadata' not in visit: | # FIXME: Should be List[OriginVisit], working on fixing | ||||
visit['metadata'] = None | # swh.storage.origin_visit_upsert (D2813) | ||||
storage.origin_visit_upsert(objects) | storage.origin_visit_upsert(visits) | ||||
elif object_type in ('directory', 'release', 'snapshot', 'origin'): | |||||
method = getattr(storage, object_type + '_add') | |||||
method(object_converter_fn[object_type](o) for o in objects) | |||||
else: | else: | ||||
logger.warning('Received a series of %s, this should not happen', | logger.warning('Received a series of %s, this should not happen', | ||||
object_type) | object_type) | ||||
def is_hash_in_bytearray(hash_, array, nb_hashes, hash_size=SHA1_SIZE): | def is_hash_in_bytearray(hash_, array, nb_hashes, hash_size=SHA1_SIZE): | ||||
""" | """ | ||||
Checks if the given hash is in the provided `array`. The array must be | Checks if the given hash is in the provided `array`. The array must be | ||||
▲ Show 20 Lines • Show All 245 Lines • Show Last 20 Lines |
If you merge D2813 first, you don't need to return List[Dict].
And instead of Tuple[List[SkippedContent], List[Content]], you can return List[BaseContent] call isinstance when iterating the list.