Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/replay.py
Show First 20 Lines • Show All 61 Lines • ▼ Show 20 Lines | for (object_type, objects) in all_objects.items(): | ||||
with statsd.timed(GRAPH_DURATION_METRIC, | with statsd.timed(GRAPH_DURATION_METRIC, | ||||
tags={'object_type': object_type}): | tags={'object_type': object_type}): | ||||
_insert_objects(object_type, objects, storage) | _insert_objects(object_type, objects, storage) | ||||
statsd.increment(GRAPH_OPERATIONS_METRIC, len(objects), | statsd.increment(GRAPH_OPERATIONS_METRIC, len(objects), | ||||
tags={'object_type': object_type}) | tags={'object_type': object_type}) | ||||
if notify: | if notify: | ||||
notify('WATCHDOG=1') | notify('WATCHDOG=1') | ||||
ardumont: type! | |||||
def _fix_contents( | |||||
contents: Iterable[Dict[str, Any]]) -> Iterable[Dict[str, Any]]: | |||||
"""Filters-out invalid 'perms' key that leaked from swh.model.from_disk | |||||
to the journal. | |||||
>>> list(_fix_contents([ | |||||
... {'perms': 0o100644, 'sha1_git': b'foo'}, | |||||
... {'sha1_git': b'bar'}, | |||||
... ])) | |||||
[{'sha1_git': b'foo'}, {'sha1_git': b'bar'}] | |||||
""" | |||||
for content in contents: | |||||
content = content.copy() | |||||
content.pop('perms', None) | |||||
yield content | |||||
def _fix_revision_pypi_empty_string(rev): | def _fix_revision_pypi_empty_string(rev): | ||||
"""PyPI loader failed to encode empty strings as bytes, see: | """PyPI loader failed to encode empty strings as bytes, see: | ||||
swh:1:rev:8f0095ee0664867055d03de9bcc8f95b91d8a2b9 | swh:1:rev:8f0095ee0664867055d03de9bcc8f95b91d8a2b9 | ||||
or https://forge.softwareheritage.org/D1772 | or https://forge.softwareheritage.org/D1772 | ||||
""" | """ | ||||
rev = { | rev = { | ||||
**rev, | **rev, | ||||
'author': rev['author'].copy(), | 'author': rev['author'].copy(), | ||||
▲ Show 20 Lines • Show All 225 Lines • ▼ Show 20 Lines | |||||
def _insert_objects(object_type: str, objects: List[Dict], storage) -> None: | def _insert_objects(object_type: str, objects: List[Dict], storage) -> None: | ||||
"""Insert objects of type object_type in the storage. | """Insert objects of type object_type in the storage. | ||||
""" | """ | ||||
if object_type == 'content': | if object_type == 'content': | ||||
contents: List[BaseContent] = [] | contents: List[BaseContent] = [] | ||||
skipped_contents: List[BaseContent] = [] | skipped_contents: List[BaseContent] = [] | ||||
for content in objects: | for content in _fix_contents(objects): | ||||
c = BaseContent.from_dict(content) | c = BaseContent.from_dict(content) | ||||
if isinstance(c, SkippedContent): | if isinstance(c, SkippedContent): | ||||
skipped_contents.append(c) | skipped_contents.append(c) | ||||
else: | else: | ||||
contents.append(c) | contents.append(c) | ||||
collision_aware_content_add( | collision_aware_content_add( | ||||
storage.skipped_content_add, skipped_contents) | storage.skipped_content_add, skipped_contents) | ||||
▲ Show 20 Lines • Show All 266 Lines • Show Last 20 Lines |
type!