Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/replay.py
Show All 15 Lines | |||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||
def process_replay_objects(all_objects, *, storage): | def process_replay_objects(all_objects, *, storage): | ||||
for (object_type, objects) in all_objects.items(): | for (object_type, objects) in all_objects.items(): | ||||
_insert_objects(object_type, objects, storage) | _insert_objects(object_type, objects, storage) | ||||
def _fix_revisions(revisions): | |||||
for rev in revisions: | |||||
# PyPI loader failed to encode empty strings as bytes, see: | |||||
# swh:1:rev:8f0095ee0664867055d03de9bcc8f95b91d8a2b9 | |||||
# or https://forge.softwareheritage.org/D1772 | |||||
if rev['author'].get('email') == '': | |||||
rev['author']['email'] = b'' | |||||
if rev['author'].get('name') == '': | |||||
rev['author']['name'] = b'' | |||||
if rev['committer'].get('email') == '': | |||||
rev['committer']['email'] = b'' | |||||
if rev['committer'].get('name') == '': | |||||
rev['committer']['name'] = b'' | |||||
def _fix_origin_visits(visits): | |||||
for visit in visits: | |||||
if isinstance(visit['origin'], str): | |||||
# note that it will crash with the pg and | |||||
# in-mem storages if the origin is not already known, | |||||
# but there is no other choice because we can't add an | |||||
# origin without knowing its type. Non-pg storages | |||||
# don't use a numeric FK internally, | |||||
visit['origin'] = {'url': visit['origin']} | |||||
else: | |||||
if 'type' not in visit: | |||||
visit['type'] = visit['origin']['type'] | |||||
def fix_objects(object_type, objects): | |||||
"""Converts a possibly old object from the journal to its current | |||||
expected format. | |||||
List of conversions: | |||||
Empty author name/email in PyPI releases: | |||||
>>> from pprint import pprint | |||||
>>> pprint(fix_objects('revision', [{ | |||||
... 'author': {'email': '', 'fullname': b'', 'name': ''}, | |||||
... 'committer': {'email': '', 'fullname': b'', 'name': ''}, | |||||
... }])) | |||||
[{'author': {'email': b'', 'fullname': b'', 'name': b''}, | |||||
'committer': {'email': b'', 'fullname': b'', 'name': b''}}] | |||||
`visit['origin']` is an URL instead of a dict: | |||||
>>> fix_objects('origin_visit', [{'origin': 'http://foo'}]) | |||||
[{'origin': {'url': 'http://foo'}}] | |||||
`visit['type']` is missing , but `origin['visit']['type']` exists: | |||||
>>> pprint(fix_objects( | |||||
... 'origin_visit', | |||||
... [{'origin': {'type': 'hg', 'url': 'http://foo'}}])) | |||||
[{'origin': {'type': 'hg', 'url': 'http://foo'}, 'type': 'hg'}] | |||||
""" | |||||
if object_type == 'revision': | |||||
_fix_revisions(objects) | |||||
elif object_type == 'origin_visit': | |||||
_fix_origin_visits(objects) | |||||
return objects | |||||
def _insert_objects(object_type, objects, storage): | def _insert_objects(object_type, objects, storage): | ||||
fix_objects(object_type, objects) | |||||
if object_type == 'content': | if object_type == 'content': | ||||
# TODO: insert 'content' in batches | # TODO: insert 'content' in batches | ||||
for object_ in objects: | for object_ in objects: | ||||
try: | try: | ||||
storage.content_add_metadata([object_]) | storage.content_add_metadata([object_]) | ||||
except HashCollision as e: | except HashCollision as e: | ||||
logger.error('Hash collision: %s', e.args) | logger.error('Hash collision: %s', e.args) | ||||
elif object_type in ('directory', 'revision', 'release', | elif object_type in ('directory', 'revision', 'release', | ||||
'snapshot', 'origin'): | 'snapshot', 'origin'): | ||||
# TODO: split batches that are too large for the storage | # TODO: split batches that are too large for the storage | ||||
# to handle? | # to handle? | ||||
method = getattr(storage, object_type + '_add') | method = getattr(storage, object_type + '_add') | ||||
method(objects) | method(objects) | ||||
elif object_type == 'origin_visit': | elif object_type == 'origin_visit': | ||||
for visit in objects: | for visit in objects: | ||||
if isinstance(visit['origin'], str): | if 'type' in visit['origin']: | ||||
# old format; note that it will crash with the pg and | |||||
# in-mem storages if the origin is not already known, | |||||
# but there is no other choice because we can't add an | |||||
# origin without knowing its type. Non-pg storages | |||||
# don't use a numeric FK internally, | |||||
visit['origin'] = {'url': visit['origin']} | |||||
else: | |||||
storage.origin_add_one(visit['origin']) | storage.origin_add_one(visit['origin']) | ||||
if 'type' not in visit: | |||||
# old format | |||||
visit['type'] = visit['origin']['type'] | |||||
storage.origin_visit_upsert(objects) | storage.origin_visit_upsert(objects) | ||||
else: | else: | ||||
logger.warning('Received a series of %s, this should not happen', | logger.warning('Received a series of %s, this should not happen', | ||||
object_type) | object_type) | ||||
def is_hash_in_bytearray(hash_, array, nb_hashes, hash_size=SHA1_SIZE): | def is_hash_in_bytearray(hash_, array, nb_hashes, hash_size=SHA1_SIZE): | ||||
""" | """ | ||||
▲ Show 20 Lines • Show All 141 Lines • Show Last 20 Lines |