Changeset View
Standalone View
swh/loader/package/loader.py
# Copyright (C) 2019-2020 The Software Heritage developers | # Copyright (C) 2019-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import datetime | import datetime | ||||
import logging | import logging | ||||
import tempfile | import tempfile | ||||
import os | import os | ||||
from typing import ( | from typing import ( | ||||
Any, Dict, Generator, List, Mapping, Optional, Sequence, Tuple | Any, Dict, Generator, List, Mapping, Optional, Sequence, Tuple | ||||
) | ) | ||||
import attr | import attr | ||||
ardumont: I patched the staging node currently running the functional loader.
But i think we can add this… | |||||
from swh.core.tarball import uncompress | from swh.core.tarball import uncompress | ||||
from swh.core.config import SWHConfig | from swh.core.config import SWHConfig | ||||
from swh.model import from_disk | from swh.model import from_disk | ||||
from swh.model.hashutil import hash_to_hex | from swh.model.hashutil import hash_to_hex | ||||
from swh.model.model import ( | from swh.model.model import ( | ||||
BaseModel, Sha1Git, | BaseModel, Sha1Git, | ||||
Content, SkippedContent, Directory, | Content, SkippedContent, Directory, | ||||
▲ Show 20 Lines • Show All 239 Lines • ▼ Show 20 Lines | def load(self) -> Dict: | ||||
Software Heritage archive | Software Heritage archive | ||||
""" | """ | ||||
status_load = 'uneventful' # either: eventful, uneventful, failed | status_load = 'uneventful' # either: eventful, uneventful, failed | ||||
status_visit = 'full' # either: partial, full | status_visit = 'full' # either: partial, full | ||||
tmp_revisions = {} # type: Dict[str, List] | tmp_revisions = {} # type: Dict[str, List] | ||||
snapshot = None | snapshot = None | ||||
def finalize_visit() -> Dict[str, Any]: | |||||
if hasattr(self.storage, 'flush'): | |||||
self.storage.flush() | |||||
self.storage.origin_visit_update( | |||||
origin=self.url, visit_id=visit.visit, status=status_visit, | |||||
snapshot=snapshot and snapshot.id) | |||||
result: Dict[str, Any] = { | |||||
'status': status_load, | |||||
} | |||||
if snapshot: | |||||
result['snapshot_id'] = hash_to_hex(snapshot.id) | |||||
return result | |||||
# Prepare origin and origin_visit | # Prepare origin and origin_visit | ||||
origin = Origin(url=self.url) | origin = Origin(url=self.url) | ||||
try: | try: | ||||
self.storage.origin_add_one(origin) | self.storage.origin_add_one(origin) | ||||
visit = self.storage.origin_visit_add( | visit = self.storage.origin_visit_add( | ||||
self.url, date=self.visit_date, type=self.visit_type) | self.url, date=self.visit_date, type=self.visit_type) | ||||
except Exception: | except Exception: | ||||
logger.exception('Failed to create origin/origin_visit:') | logger.exception('Failed to initialize origin_visit for %s', | ||||
self.url) | |||||
return {'status': 'failed'} | return {'status': 'failed'} | ||||
try: | try: | ||||
last_snapshot = self.last_snapshot() | last_snapshot = self.last_snapshot() | ||||
logger.debug('last snapshot: %s', last_snapshot) | logger.debug('last snapshot: %s', last_snapshot) | ||||
known_artifacts = self.known_artifacts(last_snapshot) | known_artifacts = self.known_artifacts(last_snapshot) | ||||
logger.debug('known artifacts: %s', known_artifacts) | logger.debug('known artifacts: %s', known_artifacts) | ||||
except Exception: | |||||
logger.exception('Failed to get previous state for %s', self.url) | |||||
status_visit = 'partial' | |||||
status_load = 'failed' | |||||
return finalize_visit() | |||||
load_exceptions = [] | |||||
for version in self.get_versions(): # for each | for version in self.get_versions(): # for each | ||||
logger.debug('version: %s', version) | logger.debug('version: %s', version) | ||||
tmp_revisions[version] = [] | tmp_revisions[version] = [] | ||||
# `p_` stands for `package_` | # `p_` stands for `package_` | ||||
for branch_name, p_info in self.get_package_info(version): | for branch_name, p_info in self.get_package_info(version): | ||||
logger.debug('package_info: %s', p_info) | logger.debug('package_info: %s', p_info) | ||||
revision_id = self.resolve_revision_from( | revision_id = self.resolve_revision_from( | ||||
known_artifacts, p_info['raw']) | known_artifacts, p_info['raw']) | ||||
if revision_id is None: | if revision_id is None: | ||||
(revision_id, loaded) = \ | try: | ||||
self._load_revision(p_info, origin) | (revision_id, loaded) = self._load_revision(p_info, | ||||
origin) | |||||
except Exception as e: | |||||
load_exceptions.append(e) | |||||
logger.exception('Failed loading branch %s for %s', | |||||
branch_name, self.url) | |||||
continue | |||||
if loaded: | if loaded: | ||||
status_load = 'eventful' | status_load = 'eventful' | ||||
else: | |||||
status_visit = 'partial' | |||||
if revision_id is None: | if revision_id is None: | ||||
continue | continue | ||||
tmp_revisions[version].append((branch_name, revision_id)) | tmp_revisions[version].append((branch_name, revision_id)) | ||||
except Exception: | if load_exceptions: | ||||
logger.exception('Fail to load %s' % self.url) | |||||
status_visit = 'partial' | status_visit = 'partial' | ||||
if not tmp_revisions: | |||||
# We could not load any revisions; fail completely | |||||
Done Inline ActionsI knew this bothered me for some reason and then it slipped my mind [1] [1] https://sentry.softwareheritage.org/share/issue/c0d1fbfcd865486bb41eea35552f50dd/ status visit can only be ongoing, partial or full... ardumont: I knew this bothered me for some reason and then it slipped my mind [1]
[1] https://sentry. | |||||
Done Inline ActionsFixed in D2870 ardumont: Fixed in D2870 | |||||
status_visit = 'failed' | |||||
status_load = 'failed' | status_load = 'failed' | ||||
finally: | return finalize_visit() | ||||
try: | |||||
# Retrieve the default release version (the "latest" one) | # Retrieve the default release version (the "latest" one) | ||||
default_version = self.get_default_version() | default_version = self.get_default_version() | ||||
logger.debug('default version: %s', default_version) | logger.debug('default version: %s', default_version) | ||||
# Retrieve extra branches | |||||
extra_branches = self.extra_branches() | extra_branches = self.extra_branches() | ||||
logger.debug('extra branches: %s', extra_branches) | logger.debug('extra branches: %s', extra_branches) | ||||
snapshot = self._load_snapshot( | |||||
default_version, tmp_revisions, extra_branches) | snapshot = self._load_snapshot(default_version, tmp_revisions, | ||||
if hasattr(self.storage, 'flush'): | extra_branches) | ||||
self.storage.flush() | |||||
Not Done Inline ActionsI actually don't really like these except Exception statements. I think it would be better to first catch all expected errors, and then fallback on except Exception if it is a unexpected exception (and tag it as unexpected in sentry). For instance, we know some archives will be broken, this would lead to a "expected error". This would allow us to write better tests, since we could test the type of the raised exception. lewo: I actually don't really like these `except Exception` statements. I think it would be better to… | |||||
self.storage.origin_visit_update( | except Exception: | ||||
origin=self.url, visit_id=visit.visit, status=status_visit, | logger.exception('Failed to build snapshot for origin %s', | ||||
snapshot=snapshot and snapshot.id) | self.url) | ||||
result: Dict[str, Any] = { | status_visit = 'partial' | ||||
'status': status_load, | status_load = 'failed' | ||||
} | |||||
if snapshot: | return finalize_visit() | ||||
result['snapshot_id'] = hash_to_hex(snapshot.id) | |||||
return result | |||||
def _load_revision(self, p_info, origin) -> Tuple[Optional[Sha1Git], bool]: | def _load_revision(self, p_info, origin) -> Tuple[Optional[Sha1Git], bool]: | ||||
"""Does all the loading of a revision itself: | """Does all the loading of a revision itself: | ||||
* downloads a package and uncompresses it | * downloads a package and uncompresses it | ||||
* loads it from disk | * loads it from disk | ||||
* adds contents, directories, and revision to self.storage | * adds contents, directories, and revision to self.storage | ||||
* returns (revision_id, loaded) | * returns (revision_id, loaded) | ||||
""" | """ | ||||
with tempfile.TemporaryDirectory() as tmpdir: | with tempfile.TemporaryDirectory() as tmpdir: | ||||
try: | try: | ||||
Not Done Inline ActionsI guess you can remove the loaded return value now; seems like the function always raises an exception when it fails to load now. This will simplify the caller as well. olasd: I guess you can remove the `loaded` return value now; seems like the function always raises an… | |||||
dl_artifacts = self.download_package(p_info, tmpdir) | dl_artifacts = self.download_package(p_info, tmpdir) | ||||
except Exception: | except Exception: | ||||
logger.exception('Unable to retrieve %s', | logger.exception('Unable to retrieve %s', | ||||
p_info) | p_info) | ||||
return (None, False) | return (None, False) | ||||
try: | try: | ||||
uncompressed_path = self.uncompress(dl_artifacts, dest=tmpdir) | uncompressed_path = self.uncompress(dl_artifacts, dest=tmpdir) | ||||
▲ Show 20 Lines • Show All 110 Lines • Show Last 20 Lines |
I patched the staging node currently running the functional loader.
But i think we can add this anyway.