Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/replay.py
Show First 20 Lines • Show All 83 Lines • ▼ Show 20 Lines | for rev in revisions: | ||||
good_revisions.append(rev) | good_revisions.append(rev) | ||||
return good_revisions | return good_revisions | ||||
def _fix_origin_visits(visits): | def _fix_origin_visits(visits): | ||||
good_visits = [] | good_visits = [] | ||||
for visit in visits: | for visit in visits: | ||||
visit = visit.copy() | visit = visit.copy() | ||||
if isinstance(visit['origin'], str): | |||||
# note that it will crash with the pg and | |||||
# in-mem storages if the origin is not already known, | |||||
# but there is no other choice because we can't add an | |||||
# origin without knowing its type. Non-pg storages | |||||
# don't use a numeric FK internally, | |||||
visit['origin'] = {'url': visit['origin']} | |||||
else: | |||||
if 'type' not in visit: | if 'type' not in visit: | ||||
if isinstance(visit['origin'], dict) and 'type' in visit['origin']: | |||||
# Very old version of the schema: visits did not have a type, | |||||
# but their 'origin' field was a dict with a 'type' key. | |||||
visit['type'] = visit['origin']['type'] | visit['type'] = visit['origin']['type'] | ||||
else: | |||||
# Very very old version of the schema: 'type' is missing, | |||||
# so there is nothing we can do to fix it. | |||||
raise ValueError('Got an origin_visit too old to be replayed.') | |||||
if isinstance(visit['origin'], dict): | |||||
# Old version of the schema: visit['origin'] was a dict. | |||||
visit['origin'] = visit['origin']['url'] | |||||
good_visits.append(visit) | good_visits.append(visit) | ||||
return good_visits | return good_visits | ||||
def fix_objects(object_type, objects): | def fix_objects(object_type, objects): | ||||
"""Converts a possibly old object from the journal to its current | """Converts a possibly old object from the journal to its current | ||||
expected format. | expected format. | ||||
▲ Show 20 Lines • Show All 68 Lines • ▼ Show 20 Lines | def fix_objects(object_type, objects): | ||||
... 'author': {'email': '', 'fullname': b'', 'name': b''}, | ... 'author': {'email': '', 'fullname': b'', 'name': b''}, | ||||
... 'committer': {'email': '', 'fullname': b'', 'name': b''}, | ... 'committer': {'email': '', 'fullname': b'', 'name': b''}, | ||||
... 'date': date, | ... 'date': date, | ||||
... 'committer_date': invalid_date3, | ... 'committer_date': invalid_date3, | ||||
... }]) | ... }]) | ||||
[] | [] | ||||
`visit['origin']` is an URL instead of a dict: | `visit['origin']` is a dict instead of an URL: | ||||
>>> fix_objects('origin_visit', [{'origin': 'http://foo'}]) | >>> pprint(fix_objects('origin_visit', [{ | ||||
[{'origin': {'url': 'http://foo'}}] | ... 'origin': {'url': 'http://foo'}, | ||||
... 'type': 'git', | |||||
... }])) | |||||
[{'origin': 'http://foo', 'type': 'git'}] | |||||
`visit['type']` is missing , but `origin['visit']['type']` exists: | `visit['type']` is missing , but `origin['visit']['type']` exists: | ||||
>>> pprint(fix_objects( | >>> pprint(fix_objects('origin_visit', [ | ||||
... 'origin_visit', | ... {'origin': {'type': 'hg', 'url': 'http://foo'} | ||||
... [{'origin': {'type': 'hg', 'url': 'http://foo'}}])) | ... }])) | ||||
[{'origin': {'type': 'hg', 'url': 'http://foo'}, 'type': 'hg'}] | [{'origin': 'http://foo', 'type': 'hg'}] | ||||
""" # noqa | """ # noqa | ||||
if object_type == 'revision': | if object_type == 'revision': | ||||
objects = _fix_revisions(objects) | objects = _fix_revisions(objects) | ||||
elif object_type == 'origin_visit': | elif object_type == 'origin_visit': | ||||
objects = _fix_origin_visits(objects) | objects = _fix_origin_visits(objects) | ||||
return objects | return objects | ||||
Show All 10 Lines | def _insert_objects(object_type, objects, storage): | ||||
elif object_type in ('directory', 'revision', 'release', | elif object_type in ('directory', 'revision', 'release', | ||||
'snapshot', 'origin'): | 'snapshot', 'origin'): | ||||
# TODO: split batches that are too large for the storage | # TODO: split batches that are too large for the storage | ||||
# to handle? | # to handle? | ||||
method = getattr(storage, object_type + '_add') | method = getattr(storage, object_type + '_add') | ||||
method(objects) | method(objects) | ||||
elif object_type == 'origin_visit': | elif object_type == 'origin_visit': | ||||
for visit in objects: | for visit in objects: | ||||
storage.origin_add_one(visit['origin']) | storage.origin_add_one({'url': visit['origin']}) | ||||
if 'metadata' not in visit: | if 'metadata' not in visit: | ||||
visit['metadata'] = None | visit['metadata'] = None | ||||
storage.origin_visit_upsert(objects) | storage.origin_visit_upsert(objects) | ||||
else: | else: | ||||
logger.warning('Received a series of %s, this should not happen', | logger.warning('Received a series of %s, this should not happen', | ||||
object_type) | object_type) | ||||
▲ Show 20 Lines • Show All 161 Lines • Show Last 20 Lines |