Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/replay.py
Show First 20 Lines • Show All 62 Lines • ▼ Show 20 Lines | for (object_type, objects) in all_objects.items(): | ||||
tags={'object_type': object_type}): | tags={'object_type': object_type}): | ||||
_insert_objects(object_type, objects, storage) | _insert_objects(object_type, objects, storage) | ||||
statsd.increment(GRAPH_OPERATIONS_METRIC, len(objects), | statsd.increment(GRAPH_OPERATIONS_METRIC, len(objects), | ||||
tags={'object_type': object_type}) | tags={'object_type': object_type}) | ||||
if notify: | if notify: | ||||
notify('WATCHDOG=1') | notify('WATCHDOG=1') | ||||
def _fix_contents( | def _fix_content(content: Dict[str, Any]) -> Dict[str, Any]: | ||||
contents: Iterable[Dict[str, Any]]) -> Iterable[Dict[str, Any]]: | |||||
"""Filters-out invalid 'perms' key that leaked from swh.model.from_disk | """Filters-out invalid 'perms' key that leaked from swh.model.from_disk | ||||
to the journal. | to the journal. | ||||
>>> list(_fix_contents([ | >>> _fix_content({'perms': 0o100644, 'sha1_git': b'foo'}) | ||||
... {'perms': 0o100644, 'sha1_git': b'foo'}, | {'sha1_git': b'foo'} | ||||
... {'sha1_git': b'bar'}, | |||||
... ])) | >>> _fix_content({'sha1_git': b'bar'}) | ||||
[{'sha1_git': b'foo'}, {'sha1_git': b'bar'}] | {'sha1_git': b'bar'} | ||||
""" | """ | ||||
for content in contents: | |||||
content = content.copy() | content = content.copy() | ||||
content.pop('perms', None) | content.pop('perms', None) | ||||
yield content | return content | ||||
def _fix_revision_pypi_empty_string(rev): | def _fix_revision_pypi_empty_string(rev): | ||||
"""PyPI loader failed to encode empty strings as bytes, see: | """PyPI loader failed to encode empty strings as bytes, see: | ||||
swh:1:rev:8f0095ee0664867055d03de9bcc8f95b91d8a2b9 | swh:1:rev:8f0095ee0664867055d03de9bcc8f95b91d8a2b9 | ||||
or https://forge.softwareheritage.org/D1772 | or https://forge.softwareheritage.org/D1772 | ||||
""" | """ | ||||
rev = { | rev = { | ||||
▲ Show 20 Lines • Show All 239 Lines • ▼ Show 20 Lines | |||||
def _insert_objects(object_type: str, objects: List[Dict], storage) -> None: | def _insert_objects(object_type: str, objects: List[Dict], storage) -> None: | ||||
"""Insert objects of type object_type in the storage. | """Insert objects of type object_type in the storage. | ||||
""" | """ | ||||
if object_type == 'content': | if object_type == 'content': | ||||
contents: List[BaseContent] = [] | contents: List[BaseContent] = [] | ||||
skipped_contents: List[BaseContent] = [] | skipped_contents: List[BaseContent] = [] | ||||
for content in _fix_contents(objects): | for content in objects: | ||||
c = BaseContent.from_dict(content) | c = BaseContent.from_dict(_fix_content(content)) | ||||
if isinstance(c, SkippedContent): | if isinstance(c, SkippedContent): | ||||
skipped_contents.append(c) | skipped_contents.append(c) | ||||
else: | else: | ||||
contents.append(c) | contents.append(c) | ||||
collision_aware_content_add( | collision_aware_content_add( | ||||
storage.skipped_content_add, skipped_contents) | storage.skipped_content_add, skipped_contents) | ||||
collision_aware_content_add( | collision_aware_content_add( | ||||
▲ Show 20 Lines • Show All 268 Lines • Show Last 20 Lines |