Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/replay.py
Show All 20 Lines | from tenacity import ( | ||||
wait_random_exponential, | wait_random_exponential, | ||||
) | ) | ||||
from swh.core.statsd import statsd | from swh.core.statsd import statsd | ||||
from swh.model.identifiers import normalize_timestamp | from swh.model.identifiers import normalize_timestamp | ||||
from swh.model.hashutil import hash_to_hex | from swh.model.hashutil import hash_to_hex | ||||
from swh.model.model import ( | from swh.model.model import ( | ||||
BaseContent, BaseModel, Content, Directory, Origin, Revision, | BaseContent, BaseModel, Content, Directory, Origin, OriginVisit, Revision, | ||||
SHA1_SIZE, SkippedContent, Snapshot, Release | SHA1_SIZE, SkippedContent, Snapshot, Release | ||||
) | ) | ||||
from swh.objstorage.objstorage import ( | from swh.objstorage.objstorage import ( | ||||
ID_HASH_ALGO, ObjNotFoundError, ObjStorage, | ID_HASH_ALGO, ObjNotFoundError, ObjStorage, | ||||
) | ) | ||||
from swh.storage import HashCollision | from swh.storage import HashCollision | ||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||
GRAPH_OPERATIONS_METRIC = "swh_graph_replayer_operations_total" | GRAPH_OPERATIONS_METRIC = "swh_graph_replayer_operations_total" | ||||
GRAPH_DURATION_METRIC = "swh_graph_replayer_duration_seconds" | GRAPH_DURATION_METRIC = "swh_graph_replayer_duration_seconds" | ||||
CONTENT_OPERATIONS_METRIC = "swh_content_replayer_operations_total" | CONTENT_OPERATIONS_METRIC = "swh_content_replayer_operations_total" | ||||
CONTENT_RETRY_METRIC = "swh_content_replayer_retries_total" | CONTENT_RETRY_METRIC = "swh_content_replayer_retries_total" | ||||
CONTENT_BYTES_METRIC = "swh_content_replayer_bytes" | CONTENT_BYTES_METRIC = "swh_content_replayer_bytes" | ||||
CONTENT_DURATION_METRIC = "swh_content_replayer_duration_seconds" | CONTENT_DURATION_METRIC = "swh_content_replayer_duration_seconds" | ||||
object_converter_fn: Dict[str, Callable[[Dict], BaseModel]] = { | object_converter_fn: Dict[str, Callable[[Dict], BaseModel]] = { | ||||
'origin': Origin.from_dict, | 'origin': Origin.from_dict, | ||||
'origin_visit': OriginVisit.from_dict, | |||||
'snapshot': Snapshot.from_dict, | 'snapshot': Snapshot.from_dict, | ||||
'revision': Revision.from_dict, | 'revision': Revision.from_dict, | ||||
'release': Release.from_dict, | 'release': Release.from_dict, | ||||
'directory': Directory.from_dict, | 'directory': Directory.from_dict, | ||||
'content': Content.from_dict, | 'content': Content.from_dict, | ||||
'skipped_content': SkippedContent.from_dict, | 'skipped_content': SkippedContent.from_dict, | ||||
} | } | ||||
▲ Show 20 Lines • Show All 143 Lines • ▼ Show 20 Lines | for rev in revisions: | ||||
if not _check_revision_date(rev): | if not _check_revision_date(rev): | ||||
logging.warning('Excluding revision (invalid date): %r', rev) | logging.warning('Excluding revision (invalid date): %r', rev) | ||||
continue | continue | ||||
if rev not in good_revisions: | if rev not in good_revisions: | ||||
good_revisions.append(rev) | good_revisions.append(rev) | ||||
return good_revisions | return good_revisions | ||||
def _fix_origin_visits(visits: List[Dict]) -> List[Dict]: | def _fix_origin_visit(visit: Dict) -> OriginVisit: | ||||
"""Adapt origin visits into a list of current storage compatible dicts. | """Adapt origin visits into a list of current storage compatible | ||||
OriginVisits. | |||||
`visit['origin']` is a dict instead of an URL: | `visit['origin']` is a dict instead of an URL: | ||||
>>> from datetime import datetime, timezone | |||||
>>> from pprint import pprint | >>> from pprint import pprint | ||||
>>> pprint(_fix_origin_visits([{ | >>> date = datetime(2020, 2, 27, 14, 39, 19, tzinfo=timezone.utc) | ||||
>>> pprint(_fix_origin_visit({ | |||||
... 'origin': {'url': 'http://foo'}, | ... 'origin': {'url': 'http://foo'}, | ||||
... 'date': date, | |||||
... 'type': 'git', | ... 'type': 'git', | ||||
... }])) | ... 'status': 'ongoing', | ||||
[{'metadata': None, 'origin': 'http://foo', 'type': 'git'}] | ... 'snapshot': None, | ||||
... }).to_dict()) | |||||
{'date': datetime.datetime(2020, 2, 27, 14, 39, 19, tzinfo=datetime.timezone.utc), | |||||
'metadata': None, | |||||
'origin': 'http://foo', | |||||
'snapshot': None, | |||||
'status': 'ongoing', | |||||
'type': 'git'} | |||||
`visit['type']` is missing , but `origin['visit']['type']` exists: | `visit['type']` is missing , but `origin['visit']['type']` exists: | ||||
>>> pprint(_fix_origin_visits([ | >>> pprint(_fix_origin_visit( | ||||
... {'origin': {'type': 'hg', 'url': 'http://foo'} | ... {'origin': {'type': 'hg', 'url': 'http://foo'}, | ||||
... }])) | ... 'date': date, | ||||
[{'metadata': None, 'origin': 'http://foo', 'type': 'hg'}] | ... 'status': 'ongoing', | ||||
... 'snapshot': None, | |||||
... }).to_dict()) | |||||
{'date': datetime.datetime(2020, 2, 27, 14, 39, 19, tzinfo=datetime.timezone.utc), | |||||
'metadata': None, | |||||
'origin': 'http://foo', | |||||
'snapshot': None, | |||||
'status': 'ongoing', | |||||
'type': 'hg'} | |||||
""" | """ # noqa | ||||
good_visits = [] | |||||
for visit in visits: | |||||
visit = visit.copy() | visit = visit.copy() | ||||
if 'type' not in visit: | if 'type' not in visit: | ||||
if isinstance(visit['origin'], dict) and 'type' in visit['origin']: | if isinstance(visit['origin'], dict) and 'type' in visit['origin']: | ||||
# Very old version of the schema: visits did not have a type, | # Very old version of the schema: visits did not have a type, | ||||
# but their 'origin' field was a dict with a 'type' key. | # but their 'origin' field was a dict with a 'type' key. | ||||
visit['type'] = visit['origin']['type'] | visit['type'] = visit['origin']['type'] | ||||
else: | else: | ||||
# Very very old version of the schema: 'type' is missing, | # Very very old version of the schema: 'type' is missing, | ||||
# so there is nothing we can do to fix it. | # so there is nothing we can do to fix it. | ||||
raise ValueError('Got an origin_visit too old to be replayed.') | raise ValueError('Got an origin_visit too old to be replayed.') | ||||
if isinstance(visit['origin'], dict): | if isinstance(visit['origin'], dict): | ||||
# Old version of the schema: visit['origin'] was a dict. | # Old version of the schema: visit['origin'] was a dict. | ||||
visit['origin'] = visit['origin']['url'] | visit['origin'] = visit['origin']['url'] | ||||
if 'metadata' not in visit: | if 'metadata' not in visit: | ||||
visit['metadata'] = None | visit['metadata'] = None | ||||
good_visits.append(visit) | return OriginVisit.from_dict(visit) | ||||
return good_visits | |||||
def collision_aware_content_add( | def collision_aware_content_add( | ||||
content_add_fn: Callable[[Iterable[Any]], None], | content_add_fn: Callable[[Iterable[Any]], None], | ||||
contents: List[BaseContent]) -> None: | contents: List[BaseContent]) -> None: | ||||
"""Add contents to storage. If a hash collision is detected, an error is | """Add contents to storage. If a hash collision is detected, an error is | ||||
logged. Then this adds the other non colliding contents to the storage. | logged. Then this adds the other non colliding contents to the storage. | ||||
▲ Show 20 Lines • Show All 48 Lines • ▼ Show 20 Lines | if object_type == 'content': | ||||
storage.skipped_content_add, skipped_contents) | storage.skipped_content_add, skipped_contents) | ||||
collision_aware_content_add( | collision_aware_content_add( | ||||
storage.content_add_metadata, contents) | storage.content_add_metadata, contents) | ||||
elif object_type == 'revision': | elif object_type == 'revision': | ||||
storage.revision_add( | storage.revision_add( | ||||
Revision.from_dict(r) for r in _fix_revisions(objects) | Revision.from_dict(r) for r in _fix_revisions(objects) | ||||
) | ) | ||||
elif object_type == 'origin_visit': | elif object_type == 'origin_visit': | ||||
visits = _fix_origin_visits(objects) | visits = [_fix_origin_visit(v) for v in objects] | ||||
storage.origin_add(Origin(url=v['origin']) for v in visits) | storage.origin_add(Origin(url=v.origin) for v in visits) | ||||
# FIXME: Should be List[OriginVisit], working on fixing | |||||
# swh.storage.origin_visit_upsert (D2813) | |||||
storage.origin_visit_upsert(visits) | storage.origin_visit_upsert(visits) | ||||
elif object_type in ('directory', 'release', 'snapshot', 'origin'): | elif object_type in ('directory', 'release', 'snapshot', 'origin'): | ||||
method = getattr(storage, object_type + '_add') | method = getattr(storage, object_type + '_add') | ||||
method(object_converter_fn[object_type](o) for o in objects) | method(object_converter_fn[object_type](o) for o in objects) | ||||
else: | else: | ||||
logger.warning('Received a series of %s, this should not happen', | logger.warning('Received a series of %s, this should not happen', | ||||
object_type) | object_type) | ||||
▲ Show 20 Lines • Show All 249 Lines • Show Last 20 Lines |