Changeset View
Changeset View
Standalone View
Standalone View
swh/loader/package/loader.py
Show First 20 Lines • Show All 400 Lines • ▼ Show 20 Lines | class PackageLoader(BaseLoader, Generic[TPackageInfo]): | ||||
def finalize_visit( | def finalize_visit( | ||||
self, | self, | ||||
*, | *, | ||||
snapshot: Optional[Snapshot], | snapshot: Optional[Snapshot], | ||||
visit: OriginVisit, | visit: OriginVisit, | ||||
status_visit: str, | status_visit: str, | ||||
status_load: str, | status_load: str, | ||||
failed_branches: List[str], | failed_branches: List[str], | ||||
errors: Optional[List[str]] = None, | |||||
) -> Dict[str, Any]: | ) -> Dict[str, Any]: | ||||
"""Finalize the visit: | """Finalize the visit: | ||||
- flush eventual unflushed data to storage | - flush eventual unflushed data to storage | ||||
- update origin visit's status | - update origin visit's status | ||||
- return the task's status | - return the task's status | ||||
""" | """ | ||||
▲ Show 20 Lines • Show All 107 Lines • ▼ Show 20 Lines | def load(self) -> Dict: | ||||
logger.exception("Failed to get previous state for %s", self.url) | logger.exception("Failed to get previous state for %s", self.url) | ||||
sentry_sdk.capture_exception(e) | sentry_sdk.capture_exception(e) | ||||
return self.finalize_visit( | return self.finalize_visit( | ||||
snapshot=snapshot, | snapshot=snapshot, | ||||
visit=visit, | visit=visit, | ||||
failed_branches=failed_branches, | failed_branches=failed_branches, | ||||
status_visit="failed", | status_visit="failed", | ||||
status_load="failed", | status_load="failed", | ||||
errors=[str(e)], | |||||
) | ) | ||||
load_exceptions: List[Exception] = [] | load_exceptions: List[Exception] = [] | ||||
# Get the list of all version names | # Get the list of all version names | ||||
try: | try: | ||||
versions = self.get_versions() | versions = self.get_versions() | ||||
except NotFound: | except NotFound as e: | ||||
return self.finalize_visit( | return self.finalize_visit( | ||||
snapshot=snapshot, | snapshot=snapshot, | ||||
visit=visit, | visit=visit, | ||||
failed_branches=failed_branches, | failed_branches=failed_branches, | ||||
status_visit="not_found", | status_visit="not_found", | ||||
status_load="failed", | status_load="failed", | ||||
errors=[str(e)], | |||||
) | ) | ||||
except Exception: | except Exception as e: | ||||
return self.finalize_visit( | return self.finalize_visit( | ||||
snapshot=snapshot, | snapshot=snapshot, | ||||
visit=visit, | visit=visit, | ||||
failed_branches=failed_branches, | failed_branches=failed_branches, | ||||
status_visit="failed", | status_visit="failed", | ||||
status_load="failed", | status_load="failed", | ||||
errors=[str(e)], | |||||
) | ) | ||||
# Get the metadata of each version's package | # Get the metadata of each version's package | ||||
packages_info: List[Tuple[str, str, TPackageInfo]] = [ | packages_info: List[Tuple[str, str, TPackageInfo]] = [ | ||||
(version, branch_name, p_info) | (version, branch_name, p_info) | ||||
for version in versions | for version in versions | ||||
for (branch_name, p_info) in self.get_package_info(version) | for (branch_name, p_info) in self.get_package_info(version) | ||||
] | ] | ||||
Show All 9 Lines | def load(self) -> Dict: | ||||
last_snapshot_targets = { | last_snapshot_targets = { | ||||
branch.target for branch in last_snapshot.branches.values() | branch.target for branch in last_snapshot.branches.values() | ||||
} | } | ||||
new_extids: Set[ExtID] = set() | new_extids: Set[ExtID] = set() | ||||
tmp_revisions: Dict[str, List[Tuple[str, Sha1Git]]] = { | tmp_revisions: Dict[str, List[Tuple[str, Sha1Git]]] = { | ||||
version: [] for version in versions | version: [] for version in versions | ||||
} | } | ||||
errors = [] | |||||
for (version, branch_name, p_info) in packages_info: | for (version, branch_name, p_info) in packages_info: | ||||
logger.debug("package_info: %s", p_info) | logger.debug("package_info: %s", p_info) | ||||
# Check if the package was already loaded, using its ExtID | # Check if the package was already loaded, using its ExtID | ||||
revision_id = self.resolve_revision_from_extids( | revision_id = self.resolve_revision_from_extids( | ||||
known_extids, p_info, last_snapshot_targets | known_extids, p_info, last_snapshot_targets | ||||
) | ) | ||||
Show All 9 Lines | def load(self) -> Dict: | ||||
p_info, revision_id, directory_id | p_info, revision_id, directory_id | ||||
) | ) | ||||
self.storage.flush() | self.storage.flush() | ||||
status_load = "eventful" | status_load = "eventful" | ||||
except Exception as e: | except Exception as e: | ||||
self.storage.clear_buffers() | self.storage.clear_buffers() | ||||
load_exceptions.append(e) | load_exceptions.append(e) | ||||
sentry_sdk.capture_exception(e) | sentry_sdk.capture_exception(e) | ||||
logger.exception( | error = f"Failed to load branch {branch_name} for {self.url}" | ||||
"Failed loading branch %s for %s", branch_name, self.url | logger.exception(error) | ||||
) | |||||
failed_branches.append(branch_name) | failed_branches.append(branch_name) | ||||
errors.append(f"{error}: {e}") | |||||
continue | continue | ||||
if revision_id is None: | if revision_id is None: | ||||
continue | continue | ||||
partial_extid = p_info.extid() | partial_extid = p_info.extid() | ||||
if partial_extid is not None: | if partial_extid is not None: | ||||
(extid_type, extid) = partial_extid | (extid_type, extid) = partial_extid | ||||
Show All 12 Lines | def load(self) -> Dict: | ||||
if not tmp_revisions: | if not tmp_revisions: | ||||
# We could not load any revisions; fail completely | # We could not load any revisions; fail completely | ||||
return self.finalize_visit( | return self.finalize_visit( | ||||
snapshot=snapshot, | snapshot=snapshot, | ||||
visit=visit, | visit=visit, | ||||
failed_branches=failed_branches, | failed_branches=failed_branches, | ||||
status_visit="failed", | status_visit="failed", | ||||
status_load="failed", | status_load="failed", | ||||
errors=errors, | |||||
) | ) | ||||
try: | try: | ||||
# Retrieve the default release version (the "latest" one) | # Retrieve the default release version (the "latest" one) | ||||
default_version = self.get_default_version() | default_version = self.get_default_version() | ||||
logger.debug("default version: %s", default_version) | logger.debug("default version: %s", default_version) | ||||
# Retrieve extra branches | # Retrieve extra branches | ||||
extra_branches = self.extra_branches() | extra_branches = self.extra_branches() | ||||
logger.debug("extra branches: %s", extra_branches) | logger.debug("extra branches: %s", extra_branches) | ||||
snapshot = self._load_snapshot( | snapshot = self._load_snapshot( | ||||
default_version, tmp_revisions, extra_branches | default_version, tmp_revisions, extra_branches | ||||
) | ) | ||||
self.storage.flush() | self.storage.flush() | ||||
except Exception as e: | except Exception as e: | ||||
logger.exception("Failed to build snapshot for origin %s", self.url) | error = f"Failed to build snapshot for origin {self.url}" | ||||
logger.exception(error) | |||||
errors.append(f"{error}: {e}") | |||||
sentry_sdk.capture_exception(e) | sentry_sdk.capture_exception(e) | ||||
status_visit = "failed" | status_visit = "failed" | ||||
status_load = "failed" | status_load = "failed" | ||||
if snapshot: | if snapshot: | ||||
try: | try: | ||||
metadata_objects = self.build_extrinsic_snapshot_metadata(snapshot.id) | metadata_objects = self.build_extrinsic_snapshot_metadata(snapshot.id) | ||||
self._load_metadata_objects(metadata_objects) | self._load_metadata_objects(metadata_objects) | ||||
except Exception as e: | except Exception as e: | ||||
logger.exception( | error = f"Failed to load extrinsic snapshot metadata for {self.url}" | ||||
"Failed to load extrinsic snapshot metadata for %s", self.url | logger.exception(error) | ||||
) | errors.append(f"{error}: {e}") | ||||
sentry_sdk.capture_exception(e) | sentry_sdk.capture_exception(e) | ||||
status_visit = "partial" | status_visit = "partial" | ||||
status_load = "failed" | status_load = "failed" | ||||
try: | try: | ||||
metadata_objects = self.build_extrinsic_origin_metadata() | metadata_objects = self.build_extrinsic_origin_metadata() | ||||
self._load_metadata_objects(metadata_objects) | self._load_metadata_objects(metadata_objects) | ||||
except Exception as e: | except Exception as e: | ||||
logger.exception( | error = f"Failed to load extrinsic origin metadata for {self.url}" | ||||
"Failed to load extrinsic origin metadata for %s", self.url | logger.exception(error) | ||||
) | errors.append(f"{error}: {e}") | ||||
sentry_sdk.capture_exception(e) | sentry_sdk.capture_exception(e) | ||||
status_visit = "partial" | status_visit = "partial" | ||||
status_load = "failed" | status_load = "failed" | ||||
self._load_extids(new_extids) | self._load_extids(new_extids) | ||||
return self.finalize_visit( | return self.finalize_visit( | ||||
snapshot=snapshot, | snapshot=snapshot, | ||||
visit=visit, | visit=visit, | ||||
failed_branches=failed_branches, | failed_branches=failed_branches, | ||||
status_visit=status_visit, | status_visit=status_visit, | ||||
status_load=status_load, | status_load=status_load, | ||||
errors=errors, | |||||
) | ) | ||||
def _load_directory( | def _load_directory( | ||||
self, dl_artifacts: List[Tuple[str, Mapping[str, Any]]], tmpdir: str | self, dl_artifacts: List[Tuple[str, Mapping[str, Any]]], tmpdir: str | ||||
) -> Tuple[str, from_disk.Directory]: | ) -> Tuple[str, from_disk.Directory]: | ||||
uncompressed_path = self.uncompress(dl_artifacts, dest=tmpdir) | uncompressed_path = self.uncompress(dl_artifacts, dest=tmpdir) | ||||
logger.debug("uncompressed_path: %s", uncompressed_path) | logger.debug("uncompressed_path: %s", uncompressed_path) | ||||
▲ Show 20 Lines • Show All 297 Lines • Show Last 20 Lines |